You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ey...@apache.org on 2019/03/07 21:48:48 UTC
[hadoop] branch trunk updated: YARN-9341. Fixed enentrant lock
usage in YARN project. Contributed by Prabhu Joseph
This is an automated email from the ASF dual-hosted git repository.
eyang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 39b4a37 YARN-9341. Fixed enentrant lock usage in YARN project. Contributed by Prabhu Joseph
39b4a37 is described below
commit 39b4a37e02e929a698fcf9e32f1f71bb6b977635
Author: Eric Yang <ey...@apache.org>
AuthorDate: Thu Mar 7 16:47:45 2019 -0500
YARN-9341. Fixed enentrant lock usage in YARN project.
Contributed by Prabhu Joseph
---
.../apache/hadoop/yarn/service/ServiceManager.java | 2 +-
.../hadoop/yarn/service/component/Component.java | 2 +-
.../component/instance/ComponentInstance.java | 10 +--
.../client/api/impl/FileSystemTimelineWriter.java | 46 +++++++-------
.../yarn/nodelabels/CommonNodeLabelsManager.java | 20 +++---
.../nodelabels/NonAppendableFSNodeLabelStore.java | 6 +-
.../yarn/security/ConfiguredYarnAuthorizer.java | 4 +-
.../yarn/server/timeline/LeveldbTimelineStore.java | 6 +-
.../yarn/server/nodemanager/ContainerExecutor.java | 9 ++-
.../application/ApplicationImpl.java | 2 +-
.../containermanager/container/ContainerImpl.java | 5 +-
.../linux/resources/CGroupsHandlerImpl.java | 9 ++-
.../localizer/LocalizedResource.java | 5 +-
.../logaggregation/TestLogAggregationService.java | 2 +-
.../monitor/capacity/FifoCandidatesSelector.java | 2 +-
.../capacity/IntraQueueCandidatesSelector.java | 2 +-
.../ProportionalCapacityPreemptionPolicy.java | 5 +-
.../nodelabels/NodeAttributesManagerImpl.java | 17 +++--
.../nodelabels/RMNodeLabelsManager.java | 35 +++++------
.../placement/PlacementManager.java | 6 +-
.../server/resourcemanager/rmapp/RMAppImpl.java | 8 +--
.../rmapp/attempt/RMAppAttemptImpl.java | 6 +-
.../rmapp/attempt/RMAppAttemptMetrics.java | 4 +-
.../rmcontainer/RMContainerImpl.java | 27 ++++----
.../server/resourcemanager/rmnode/RMNodeImpl.java | 5 +-
.../scheduler/AbstractResourceUsage.java | 14 ++---
.../scheduler/AbstractYarnScheduler.java | 10 +--
.../scheduler/AppSchedulingInfo.java | 27 ++++----
.../resourcemanager/scheduler/ResourceUsage.java | 4 +-
.../scheduler/SchedulerApplicationAttempt.java | 53 ++++++++--------
.../capacity/AbstractAutoCreatedLeafQueue.java | 2 +-
.../scheduler/capacity/AbstractCSQueue.java | 23 ++++---
.../capacity/AbstractManagedParentQueue.java | 13 ++--
.../scheduler/capacity/AutoCreatedLeafQueue.java | 5 +-
.../scheduler/capacity/CapacityScheduler.java | 66 ++++++++++----------
.../scheduler/capacity/LeafQueue.java | 72 +++++++++++-----------
.../scheduler/capacity/ManagedParentQueue.java | 16 ++---
.../scheduler/capacity/ParentQueue.java | 39 ++++++------
.../scheduler/capacity/PlanQueue.java | 2 +-
.../scheduler/capacity/QueueCapacities.java | 12 ++--
.../scheduler/capacity/ReservationQueue.java | 2 +-
.../scheduler/capacity/UsersManager.java | 49 ++++++++-------
.../capacity/preemption/PreemptionManager.java | 12 ++--
.../GuaranteedOrZeroCapacityOverTimePolicy.java | 23 ++++---
.../scheduler/common/fica/FiCaSchedulerApp.java | 28 ++++-----
.../MemoryPlacementConstraintManager.java | 22 +++----
.../scheduler/fair/FSAppAttempt.java | 21 +++----
.../scheduler/fifo/FifoAppAttempt.java | 4 +-
.../placement/LocalityAppPlacementAllocator.java | 14 ++---
.../SingleConstraintAppPlacementAllocator.java | 4 +-
.../security/NMTokenSecretManagerInRM.java | 12 ++--
.../volume/csi/lifecycle/VolumeImpl.java | 6 +-
52 files changed, 388 insertions(+), 412 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java
index aefdadd..3c8fed6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java
@@ -117,8 +117,8 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
@Override
public void handle(ServiceEvent event) {
+ writeLock.lock();
try {
- writeLock.lock();
State oldState = getState();
try {
stateMachine.doTransition(event.getType(), event);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
index 8958dc7..cbc489c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
@@ -1090,8 +1090,8 @@ public class Component implements EventHandler<ComponentEvent> {
@Override
public void handle(ComponentEvent event) {
+ writeLock.lock();
try {
- writeLock.lock();
ComponentState oldState = getState();
try {
stateMachine.doTransition(event.getType(), event);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
index f44cd6e..700408e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
@@ -751,8 +751,8 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
@Override
public void handle(ComponentInstanceEvent event) {
+ writeLock.lock();
try {
- writeLock.lock();
ComponentInstanceState oldState = getState();
try {
stateMachine.doTransition(event.getType(), event);
@@ -782,8 +782,8 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
void updateLocalizationStatuses(
List<org.apache.hadoop.yarn.api.records.LocalizationStatus> statuses) {
Map<String, String> resourcesCpy = new HashMap<>();
+ readLock.lock();
try {
- readLock.lock();
if (resolvedParams == null || resolvedParams.didLaunchFail() ||
resolvedParams.getResolvedRsrcPaths() == null ||
resolvedParams.getResolvedRsrcPaths().isEmpty()) {
@@ -823,8 +823,8 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
public void updateResolvedLaunchParams(
Future<ProviderService.ResolvedLaunchParams> future) {
+ writeLock.lock();
try {
- writeLock.lock();
this.resolvedParams = future.get();
} catch (InterruptedException | ExecutionException e) {
LOG.error("{} updating resolved params", getCompInstanceId(), e);
@@ -834,8 +834,8 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
}
public ContainerStatus getContainerStatus() {
+ readLock.lock();
try {
- readLock.lock();
return status;
} finally {
readLock.unlock();
@@ -844,8 +844,8 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
private void setContainerStatus(ContainerId containerId,
ContainerStatus latestStatus) {
+ writeLock.lock();
try {
- writeLock.lock();
this.status = latestStatus;
org.apache.hadoop.yarn.service.api.records.Container containerRec =
getCompSpec().getContainer(containerId.toString());
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
index bc5e987..ace5fdf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
@@ -478,8 +478,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
@Override
public void flush() throws IOException {
+ this.domainFDLocker.lock();
try {
- this.domainFDLocker.lock();
if (domainLogFD != null) {
domainLogFD.flush();
}
@@ -494,8 +494,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
private Map<ApplicationAttemptId, EntityLogFD> copySummaryLogFDs(
Map<ApplicationAttemptId, EntityLogFD> summanyLogFDsToCopy) {
+ summaryTableCopyLocker.lock();
try {
- summaryTableCopyLocker.lock();
return new HashMap<ApplicationAttemptId, EntityLogFD>(
summanyLogFDsToCopy);
} finally {
@@ -506,8 +506,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
private Map<ApplicationAttemptId, HashMap<TimelineEntityGroupId,
EntityLogFD>> copyEntityLogFDs(Map<ApplicationAttemptId,
HashMap<TimelineEntityGroupId, EntityLogFD>> entityLogFDsToCopy) {
+ entityTableCopyLocker.lock();
try {
- entityTableCopyLocker.lock();
return new HashMap<ApplicationAttemptId, HashMap<TimelineEntityGroupId,
EntityLogFD>>(entityLogFDsToCopy);
} finally {
@@ -521,8 +521,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
for (Entry<ApplicationAttemptId, EntityLogFD> logFDEntry : logFDs
.entrySet()) {
EntityLogFD logFD = logFDEntry.getValue();
+ logFD.lock();
try {
- logFD.lock();
logFD.flush();
} finally {
logFD.unlock();
@@ -541,8 +541,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
for (Entry<TimelineEntityGroupId, EntityLogFD> logFDEntry
: logFDMap.entrySet()) {
EntityLogFD logFD = logFDEntry.getValue();
+ logFD.lock();
try {
- logFD.lock();
logFD.flush();
} finally {
logFD.unlock();
@@ -567,8 +567,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
private void cleanInActiveFDs() {
long currentTimeStamp = Time.monotonicNow();
+ this.domainFDLocker.lock();
try {
- this.domainFDLocker.lock();
if (domainLogFD != null) {
if (currentTimeStamp - domainLogFD.getLastModifiedTime() >= ttl) {
domainLogFD.close();
@@ -593,8 +593,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
for (Entry<ApplicationAttemptId, EntityLogFD> logFDEntry : logFDs
.entrySet()) {
EntityLogFD logFD = logFDEntry.getValue();
+ logFD.lock();
try {
- logFD.lock();
if (currentTimeStamp - logFD.getLastModifiedTime() >= ttl) {
logFD.close();
}
@@ -617,8 +617,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
for (Entry<TimelineEntityGroupId, EntityLogFD> logFDEntry
: logFDMap.entrySet()) {
EntityLogFD logFD = logFDEntry.getValue();
+ logFD.lock();
try {
- logFD.lock();
if (currentTimeStamp - logFD.getLastModifiedTime() >= ttl) {
logFD.close();
}
@@ -644,8 +644,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
private class TimerMonitorTask extends TimerTask {
@Override
public void run() {
+ timerTasksMonitorWriteLock.lock();
try {
- timerTasksMonitorWriteLock.lock();
monitorTimerTasks();
} finally {
timerTasksMonitorWriteLock.unlock();
@@ -691,8 +691,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
monitorTaskTimer = null;
}
+ this.domainFDLocker.lock();
try {
- this.domainFDLocker.lock();
if (domainLogFD != null) {
domainLogFD.close();
domainLogFD = null;
@@ -708,8 +708,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
private void closeEntityFDs(Map<ApplicationAttemptId,
HashMap<TimelineEntityGroupId, EntityLogFD>> logFDs) {
+ entityTableLocker.lock();
try {
- entityTableLocker.lock();
if (!logFDs.isEmpty()) {
for (Entry<ApplicationAttemptId, HashMap<TimelineEntityGroupId,
EntityLogFD>> logFDMapEntry : logFDs.entrySet()) {
@@ -734,8 +734,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
private void closeSummaryFDs(
Map<ApplicationAttemptId, EntityLogFD> logFDs) {
+ summaryTableLocker.lock();
try {
- summaryTableLocker.lock();
if (!logFDs.isEmpty()) {
for (Entry<ApplicationAttemptId, EntityLogFD> logFDEntry
: logFDs.entrySet()) {
@@ -757,8 +757,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
ObjectMapper objMapper, TimelineDomain domain,
boolean isAppendSupported) throws IOException {
checkAndStartTimeTasks();
+ this.domainFDLocker.lock();
try {
- this.domainFDLocker.lock();
if (this.domainLogFD != null) {
this.domainLogFD.writeDomain(domain);
} else {
@@ -790,8 +790,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
if (logMapFD != null) {
EntityLogFD logFD = logMapFD.get(groupId);
if (logFD != null) {
+ logFD.lock();
try {
- logFD.lock();
if (serviceStopped) {
return;
}
@@ -814,8 +814,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
TimelineEntityGroupId groupId, List<TimelineEntity> entities,
boolean isAppendSupported, Map<ApplicationAttemptId, HashMap<
TimelineEntityGroupId, EntityLogFD>> logFDs) throws IOException{
+ entityTableLocker.lock();
try {
- entityTableLocker.lock();
if (serviceStopped) {
return;
}
@@ -828,11 +828,11 @@ public class FileSystemTimelineWriter extends TimelineWriter{
if (logFD == null) {
logFD = new EntityLogFD(fs, logPath, objMapper, isAppendSupported);
}
+ logFD.lock();
try {
- logFD.lock();
logFD.writeEntities(entities);
+ entityTableCopyLocker.lock();
try {
- entityTableCopyLocker.lock();
logFDMap.put(groupId, logFD);
logFDs.put(attemptId, logFDMap);
} finally {
@@ -862,8 +862,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
EntityLogFD logFD = null;
logFD = logFDs.get(attemptId);
if (logFD != null) {
+ logFD.lock();
try {
- logFD.lock();
if (serviceStopped) {
return;
}
@@ -881,8 +881,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
ObjectMapper objMapper, ApplicationAttemptId attemptId,
List<TimelineEntity> entities, boolean isAppendSupported,
Map<ApplicationAttemptId, EntityLogFD> logFDs) throws IOException {
+ summaryTableLocker.lock();
try {
- summaryTableLocker.lock();
if (serviceStopped) {
return;
}
@@ -890,11 +890,11 @@ public class FileSystemTimelineWriter extends TimelineWriter{
if (logFD == null) {
logFD = new EntityLogFD(fs, logPath, objMapper, isAppendSupported);
}
+ logFD.lock();
try {
- logFD.lock();
logFD.writeEntities(entities);
+ summaryTableCopyLocker.lock();
try {
- summaryTableCopyLocker.lock();
logFDs.put(attemptId, logFD);
} finally {
summaryTableCopyLocker.unlock();
@@ -928,12 +928,12 @@ public class FileSystemTimelineWriter extends TimelineWriter{
}
private void checkAndStartTimeTasks() {
+ this.timerTasksMonitorReadLock.lock();
try {
- this.timerTasksMonitorReadLock.lock();
this.timeStampOfLastWrite = Time.monotonicNow();
if(!timerTaskStarted) {
+ timerTaskLocker.lock();
try {
- timerTaskLocker.lock();
if (!timerTaskStarted) {
createAndStartTimerTasks();
timerTaskStarted = true;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java
index e5b3d63..c706989 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java
@@ -766,8 +766,8 @@ public class CommonNodeLabelsManager extends AbstractService {
@SuppressWarnings("unchecked")
private <T> Map<NodeId, Set<T>> generateNodeLabelsInfoPerNode(Class<T> type) {
+ readLock.lock();
try {
- readLock.lock();
Map<NodeId, Set<T>> nodeToLabels = new HashMap<>();
for (Entry<String, Host> entry : nodeCollections.entrySet()) {
String hostName = entry.getKey();
@@ -809,8 +809,8 @@ public class CommonNodeLabelsManager extends AbstractService {
* @return set of nodes with no labels
*/
public Set<NodeId> getNodesWithoutALabel() {
+ readLock.lock();
try {
- readLock.lock();
Set<NodeId> nodes = new HashSet<>();
for (Host host : nodeCollections.values()) {
for (NodeId nodeId : host.nms.keySet()) {
@@ -832,8 +832,8 @@ public class CommonNodeLabelsManager extends AbstractService {
* @return labels to nodes map
*/
public Map<String, Set<NodeId>> getLabelsToNodes() {
+ readLock.lock();
try {
- readLock.lock();
return getLabelsToNodes(labelCollections.keySet());
} finally {
readLock.unlock();
@@ -848,8 +848,8 @@ public class CommonNodeLabelsManager extends AbstractService {
* @return labels to nodes map
*/
public Map<String, Set<NodeId>> getLabelsToNodes(Set<String> labels) {
+ readLock.lock();
try {
- readLock.lock();
Map<String, Set<NodeId>> labelsToNodes = getLabelsToNodesMapping(labels,
String.class);
return Collections.unmodifiableMap(labelsToNodes);
@@ -865,8 +865,8 @@ public class CommonNodeLabelsManager extends AbstractService {
* @return labels to nodes map
*/
public Map<NodeLabel, Set<NodeId>> getLabelsInfoToNodes() {
+ readLock.lock();
try {
- readLock.lock();
return getLabelsInfoToNodes(labelCollections.keySet());
} finally {
readLock.unlock();
@@ -882,8 +882,8 @@ public class CommonNodeLabelsManager extends AbstractService {
* @return labels to nodes map
*/
public Map<NodeLabel, Set<NodeId>> getLabelsInfoToNodes(Set<String> labels) {
+ readLock.lock();
try {
- readLock.lock();
Map<NodeLabel, Set<NodeId>> labelsToNodes = getLabelsToNodesMapping(
labels, NodeLabel.class);
return Collections.unmodifiableMap(labelsToNodes);
@@ -922,8 +922,8 @@ public class CommonNodeLabelsManager extends AbstractService {
* @return existing valid labels in repository
*/
public Set<String> getClusterNodeLabelNames() {
+ readLock.lock();
try {
- readLock.lock();
Set<String> labels = new HashSet<String>(labelCollections.keySet());
labels.remove(NO_LABEL);
return Collections.unmodifiableSet(labels);
@@ -933,8 +933,8 @@ public class CommonNodeLabelsManager extends AbstractService {
}
public List<NodeLabel> getClusterNodeLabels() {
+ readLock.lock();
try {
- readLock.lock();
List<NodeLabel> nodeLabels = new ArrayList<>();
for (RMNodeLabel label : labelCollections.values()) {
if (!label.getLabelName().equals(NO_LABEL)) {
@@ -952,8 +952,8 @@ public class CommonNodeLabelsManager extends AbstractService {
if (nodeLabel.equals(NO_LABEL)) {
return noNodeLabel.getIsExclusive();
}
+ readLock.lock();
try {
- readLock.lock();
RMNodeLabel label = labelCollections.get(nodeLabel);
if (label == null) {
String message =
@@ -1048,8 +1048,8 @@ public class CommonNodeLabelsManager extends AbstractService {
}
public Set<NodeLabel> getLabelsInfoByNode(NodeId nodeId) {
+ readLock.lock();
try {
- readLock.lock();
Set<String> labels = getLabelsByNode(nodeId, nodeCollections);
if (labels.isEmpty()) {
return EMPTY_NODELABEL_SET;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NonAppendableFSNodeLabelStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NonAppendableFSNodeLabelStore.java
index 9e90f33..afacac7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NonAppendableFSNodeLabelStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NonAppendableFSNodeLabelStore.java
@@ -93,10 +93,10 @@ public class NonAppendableFSNodeLabelStore extends FileSystemNodeLabelsStore {
private void writeNewMirror() throws IOException {
ReentrantReadWriteLock.ReadLock readLock = manager.readLock;
+ // Acquire readlock to make sure we get cluster node labels and
+ // node-to-labels mapping atomically.
+ readLock.lock();
try {
- // Acquire readlock to make sure we get cluster node labels and
- // node-to-labels mapping atomically.
- readLock.lock();
// Write mirror to mirror.new.tmp file
Path newTmpPath = new Path(fsWorkingPath, MIRROR_FILENAME + ".new.tmp");
try (FSDataOutputStream os = fs.create(newTmpPath, true)) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ConfiguredYarnAuthorizer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ConfiguredYarnAuthorizer.java
index 36c5214..615ecb0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ConfiguredYarnAuthorizer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ConfiguredYarnAuthorizer.java
@@ -57,8 +57,8 @@ public class ConfiguredYarnAuthorizer extends YarnAuthorizationProvider {
@Override
public void setPermission(List<Permission> permissions,
UserGroupInformation user) {
+ writeLock.lock();
try {
- writeLock.lock();
for (Permission perm : permissions) {
allAcls.put(perm.getTarget(), perm.getAcls());
}
@@ -94,8 +94,8 @@ public class ConfiguredYarnAuthorizer extends YarnAuthorizationProvider {
@Override
public boolean checkPermission(AccessRequest accessRequest) {
+ readLock.lock();
try {
- readLock.lock();
return checkPermissionInternal(accessRequest.getAccessType(),
accessRequest.getEntity(), accessRequest.getUser());
} finally {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java
index e3db1dc..c9ce936 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java
@@ -986,8 +986,8 @@ public class LeveldbTimelineStore extends AbstractService
@Override
public TimelinePutResponse put(TimelineEntities entities) {
+ deleteLock.readLock().lock();
try {
- deleteLock.readLock().lock();
TimelinePutResponse response = new TimelinePutResponse();
for (TimelineEntity entity : entities.getEntities()) {
put(entity, response, false);
@@ -1001,8 +1001,8 @@ public class LeveldbTimelineStore extends AbstractService
@Private
@VisibleForTesting
public TimelinePutResponse putWithNoDomainId(TimelineEntities entities) {
+ deleteLock.readLock().lock();
try {
- deleteLock.readLock().lock();
TimelinePutResponse response = new TimelinePutResponse();
for (TimelineEntity entity : entities.getEntities()) {
put(entity, response, true);
@@ -1525,8 +1525,8 @@ public class LeveldbTimelineStore extends AbstractService
LeveldbIterator iterator = null;
LeveldbIterator pfIterator = null;
long typeCount = 0;
+ deleteLock.writeLock().lock();
try {
- deleteLock.writeLock().lock();
iterator = getDbIterator(false);
pfIterator = getDbIterator(false);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
index a87c494..61e4364 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
@@ -569,8 +569,8 @@ public abstract class ContainerExecutor implements Configurable {
* @return the path of the pid-file for the given containerId.
*/
protected Path getPidFilePath(ContainerId containerId) {
+ readLock.lock();
try {
- readLock.lock();
return (this.pidFiles.get(containerId));
} finally {
readLock.unlock();
@@ -720,9 +720,8 @@ public abstract class ContainerExecutor implements Configurable {
* @return true if the container is active
*/
protected boolean isContainerActive(ContainerId containerId) {
+ readLock.lock();
try {
- readLock.lock();
-
return (this.pidFiles.containsKey(containerId));
} finally {
readLock.unlock();
@@ -742,8 +741,8 @@ public abstract class ContainerExecutor implements Configurable {
* of the launched process
*/
public void activateContainer(ContainerId containerId, Path pidFilePath) {
+ writeLock.lock();
try {
- writeLock.lock();
this.pidFiles.put(containerId, pidFilePath);
} finally {
writeLock.unlock();
@@ -778,8 +777,8 @@ public abstract class ContainerExecutor implements Configurable {
* @param containerId the container ID
*/
public void deactivateContainer(ContainerId containerId) {
+ writeLock.lock();
try {
- writeLock.lock();
this.pidFiles.remove(containerId);
} finally {
writeLock.unlock();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
index ad995fb..5f02e33 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
@@ -667,8 +667,8 @@ public class ApplicationImpl implements Application {
@VisibleForTesting
public LogAggregationContext getLogAggregationContext() {
+ this.readLock.lock();
try {
- this.readLock.lock();
return this.logAggregationContext;
} finally {
this.readLock.unlock();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index 8aa8d07..d25206c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -953,8 +953,8 @@ public class ContainerImpl implements Container {
@Override
public void setIpAndHost(String[] ipAndHost) {
+ this.writeLock.lock();
try {
- this.writeLock.lock();
this.ips = ipAndHost[0];
this.host = ipAndHost[1];
} finally {
@@ -2107,9 +2107,8 @@ public class ContainerImpl implements Container {
@Override
public void handle(ContainerEvent event) {
+ this.writeLock.lock();
try {
- this.writeLock.lock();
-
ContainerId containerID = event.getContainerID();
if (LOG.isDebugEnabled()) {
LOG.debug("Processing " + containerID + " of type " + event.getType());
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java
index d2ec207..4fa6c02 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java
@@ -130,8 +130,8 @@ class CGroupsHandlerImpl implements CGroupsHandler {
@Override
public String getControllerPath(CGroupController controller) {
+ rwLock.readLock().lock();
try {
- rwLock.readLock().lock();
return controllerPaths.get(controller);
} finally {
rwLock.readLock().unlock();
@@ -169,8 +169,8 @@ class CGroupsHandlerImpl implements CGroupsHandler {
}
// we want to do a bulk update without the paths changing concurrently
+ rwLock.writeLock().lock();
try {
- rwLock.writeLock().lock();
controllerPaths = cPaths;
parsedMtab = newMtab;
} finally {
@@ -293,10 +293,9 @@ class CGroupsHandlerImpl implements CGroupsHandler {
if (existingMountPath == null ||
!requestedMountPath.equals(existingMountPath)) {
+ //lock out other readers/writers till we are done
+ rwLock.writeLock().lock();
try {
- //lock out other readers/writers till we are done
- rwLock.writeLock().lock();
-
// If the controller was already mounted we have to mount it
// with the same options to clone the mount point otherwise
// the operation will fail
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
index 7cca7cf..25990d6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
@@ -116,8 +116,8 @@ public class LocalizedResource implements EventHandler<ResourceEvent> {
.append(getState() == ResourceState.LOCALIZED
? getLocalPath() + "," + getSize()
: "pending").append(",[");
+ this.readLock.lock();
try {
- this.readLock.lock();
for (ContainerId c : ref) {
sb.append("(").append(c.toString()).append(")");
}
@@ -187,9 +187,8 @@ public class LocalizedResource implements EventHandler<ResourceEvent> {
@Override
public void handle(ResourceEvent event) {
+ this.writeLock.lock();
try {
- this.writeLock.lock();
-
Path resourcePath = event.getLocalResourceRequest().getPath();
if (LOG.isDebugEnabled()) {
LOG.debug("Processing " + resourcePath + " of type " + event.getType());
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index 1130c0c..000b73b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -1194,8 +1194,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
final Lock rLock = rwLock.readLock();
final Lock wLock = rwLock.writeLock();
+ wLock.lock();
try {
- wLock.lock();
Runnable runnable = new Runnable() {
@Override
public void run() {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
index 2820b18..d8150f0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
@@ -96,8 +96,8 @@ public class FifoCandidatesSelector
.getResToObtainByPartitionForLeafQueue(preemptionContext,
queueName, clusterResource);
+ leafQueue.getReadLock().lock();
try {
- leafQueue.getReadLock().lock();
// go through all ignore-partition-exclusivity containers first to make
// sure such containers will be preemptionCandidates first
Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityContainers =
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
index 5661d4b..3780c73 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
@@ -178,8 +178,8 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
// 7. Based on the selected resource demand per partition, select
// containers with known policy from inter-queue preemption.
+ leafQueue.getReadLock().lock();
try {
- leafQueue.getReadLock().lock();
for (FiCaSchedulerApp app : apps) {
preemptFromLeastStarvedApp(leafQueue, app, selectedCandidates,
curCandidates, clusterResource, totalPreemptedResourceAllowed,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
index 754a9e1..15513d2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
@@ -566,10 +566,9 @@ public class ProportionalCapacityPreemptionPolicy
Resource partitionResource, String partitionToLookAt) {
TempQueuePerPartition ret;
ReadLock readLock = curQueue.getReadLock();
+ // Acquire a read lock from Parent/LeafQueue.
+ readLock.lock();
try {
- // Acquire a read lock from Parent/LeafQueue.
- readLock.lock();
-
String queueName = curQueue.getQueueName();
QueueCapacities qc = curQueue.getQueueCapacities();
float absCap = qc.getAbsoluteCapacity(partitionToLookAt);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java
index 55fdb9c..d3edba4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java
@@ -158,9 +158,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
AttributeMappingOperationType op,
Map<NodeAttributeKey, RMNodeAttribute> newAttributesToBeAdded,
String attributePrefix) {
+ writeLock.lock();
try {
- writeLock.lock();
-
// shows node->attributes Mapped as part of this operation.
StringBuilder logMsg = new StringBuilder(op.name());
logMsg.append(" attributes on nodes:");
@@ -403,8 +402,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
public Map<NodeAttributeKey,
Map<String, AttributeValue>> getAttributesToNodes(
Set<NodeAttributeKey> attributes) {
+ readLock.lock();
try {
- readLock.lock();
boolean fetchAllAttributes = (attributes == null || attributes.isEmpty());
Map<NodeAttributeKey, Map<String, AttributeValue>> attributesToNodes =
new HashMap<>();
@@ -423,8 +422,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
}
public Resource getResourceByAttribute(NodeAttribute attribute) {
+ readLock.lock();
try {
- readLock.lock();
return clusterAttributes.containsKey(attribute.getAttributeKey())
? clusterAttributes.get(attribute.getAttributeKey()).getResource()
: Resource.newInstance(0, 0);
@@ -436,8 +435,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
@Override
public Map<NodeAttribute, AttributeValue> getAttributesForNode(
String hostName) {
+ readLock.lock();
try {
- readLock.lock();
return nodeCollections.containsKey(hostName)
? nodeCollections.get(hostName).getAttributes()
: new HashMap<>();
@@ -448,8 +447,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
@Override
public List<NodeToAttributes> getNodeToAttributes(Set<String> prefix) {
+ readLock.lock();
try {
- readLock.lock();
List<NodeToAttributes> nodeToAttributes = new ArrayList<>();
nodeCollections.forEach((k, v) -> {
List<NodeAttribute> attrs;
@@ -476,8 +475,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
@Override
public Map<String, Set<NodeAttribute>> getNodesToAttributes(
Set<String> hostNames) {
+ readLock.lock();
try {
- readLock.lock();
boolean fetchAllNodes = (hostNames == null || hostNames.isEmpty());
Map<String, Set<NodeAttribute>> nodeToAttrs = new HashMap<>();
if (fetchAllNodes) {
@@ -498,8 +497,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
}
public void activateNode(NodeId nodeId, Resource resource) {
+ writeLock.lock();
try {
- writeLock.lock();
String hostName = nodeId.getHost();
Host host = nodeCollections.get(hostName);
if (host == null) {
@@ -516,8 +515,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
}
public void deactivateNode(NodeId nodeId) {
+ writeLock.lock();
try {
- writeLock.lock();
Host host = nodeCollections.get(nodeId.getHost());
for (NodeAttribute attribute : host.getAttributes().keySet()) {
clusterAttributes.get(attribute.getAttributeKey())
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java
index 507f696..370a2f0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java
@@ -70,10 +70,9 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
@Override
public void addLabelsToNode(Map<NodeId, Set<String>> addedLabelsToNode)
- throws IOException {
+ throws IOException {
+ writeLock.lock();
try {
- writeLock.lock();
-
// get nodesCollection before edition
Map<String, Host> before = cloneNodeMap(addedLabelsToNode.keySet());
@@ -112,8 +111,8 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
@Override
public void removeFromClusterNodeLabels(Collection<String> labelsToRemove)
throws IOException {
+ writeLock.lock();
try {
- writeLock.lock();
if (!isInitNodeLabelStoreInProgress()) {
// We cannot remove node labels from collection when some queue(s) are
// using any of them.
@@ -137,8 +136,8 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
@Override
public void addToCluserNodeLabels(Collection<NodeLabel> labels)
throws IOException {
+ writeLock.lock();
try {
- writeLock.lock();
super.addToCluserNodeLabels(labels);
} finally {
writeLock.unlock();
@@ -149,9 +148,8 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
public void
removeLabelsFromNode(Map<NodeId, Set<String>> removeLabelsFromNode)
throws IOException {
+ writeLock.lock();
try {
- writeLock.lock();
-
// get nodesCollection before edition
Map<String, Host> before =
cloneNodeMap(removeLabelsFromNode.keySet());
@@ -171,9 +169,8 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
@Override
public void replaceLabelsOnNode(Map<NodeId, Set<String>> replaceLabelsToNode)
throws IOException {
+ writeLock.lock();
try {
- writeLock.lock();
-
Map<NodeId, Set<String>> effectiveModifiedLabelMappings =
getModifiedNodeLabelsMappings(replaceLabelsToNode);
@@ -230,9 +227,8 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
* will update running nodes resource
*/
public void activateNode(NodeId nodeId, Resource resource) {
+ writeLock.lock();
try {
- writeLock.lock();
-
// save if we have a node before
Map<String, Host> before = cloneNodeMap(ImmutableSet.of(nodeId));
@@ -273,9 +269,8 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
* Following methods are used for setting if a node unregistered to RM
*/
public void deactivateNode(NodeId nodeId) {
+ writeLock.lock();
try {
- writeLock.lock();
-
// save if we have a node before
Map<String, Host> before = cloneNodeMap(ImmutableSet.of(nodeId));
Node nm = getNMInNodeSet(nodeId);
@@ -314,8 +309,8 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
}
public void reinitializeQueueLabels(Map<String, Set<String>> queueToLabels) {
+ writeLock.lock();
try {
- writeLock.lock();
// clear before set
this.queueCollections.clear();
@@ -347,8 +342,8 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
public Resource getQueueResource(String queueName, Set<String> queueLabels,
Resource clusterResource) {
+ readLock.lock();
try {
- readLock.lock();
if (queueLabels.contains(ANY)) {
return clusterResource;
}
@@ -369,8 +364,8 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
if (label == null) {
return 0;
}
+ readLock.lock();
try {
- readLock.lock();
RMNodeLabel labelInfo = labelCollections.get(label);
return (labelInfo == null) ? 0 : labelInfo.getNumActiveNMs();
} finally {
@@ -379,8 +374,8 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
}
public Set<String> getLabelsOnNode(NodeId nodeId) {
+ readLock.lock();
try {
- readLock.lock();
Set<String> nodeLabels = getLabelsByNode(nodeId);
return Collections.unmodifiableSet(nodeLabels);
} finally {
@@ -389,8 +384,8 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
}
public boolean containsNodeLabel(String label) {
+ readLock.lock();
try {
- readLock.lock();
return label != null
&& (label.isEmpty() || labelCollections.containsKey(label));
} finally {
@@ -522,8 +517,8 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
if (label.equals(NO_LABEL)) {
return noNodeLabel.getResource();
}
+ readLock.lock();
try {
- readLock.lock();
RMNodeLabel nodeLabel = labelCollections.get(label);
if (nodeLabel == null) {
return Resources.none();
@@ -572,8 +567,8 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
}
public List<RMNodeLabel> pullRMNodeLabelsInfo() {
+ readLock.lock();
try {
- readLock.lock();
List<RMNodeLabel> infos = new ArrayList<RMNodeLabel>();
for (Entry<String, RMNodeLabel> entry : labelCollections.entrySet()) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java
index 4537e83..6a6c3b9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java
@@ -45,8 +45,8 @@ public class PlacementManager {
}
public void updateRules(List<PlacementRule> rules) {
+ writeLock.lock();
try {
- writeLock.lock();
this.rules = rules;
} finally {
writeLock.unlock();
@@ -55,10 +55,8 @@ public class PlacementManager {
public ApplicationPlacementContext placeApplication(
ApplicationSubmissionContext asc, String user) throws YarnException {
-
+ readLock.lock();
try {
- readLock.lock();
-
if (null == rules || rules.isEmpty()) {
return null;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 342ddd1..9a16b77 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -1776,8 +1776,8 @@ public class RMAppImpl implements RMApp, Recoverable {
@Override
public Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp() {
+ this.readLock.lock();
try {
- this.readLock.lock();
if (!isLogAggregationFinished() && isAppInFinalState(this) &&
systemClock.getTime() > this.logAggregationStartTime
+ this.logAggregationStatusTimeout) {
@@ -1801,8 +1801,8 @@ public class RMAppImpl implements RMApp, Recoverable {
}
public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) {
+ this.writeLock.lock();
try {
- this.writeLock.lock();
if (this.logAggregationEnabled && !isLogAggregationFinished()) {
LogAggregationReport curReport = this.logAggregationStatus.get(nodeId);
boolean stateChangedToFinal = false;
@@ -1851,8 +1851,8 @@ public class RMAppImpl implements RMApp, Recoverable {
@Override
public LogAggregationStatus getLogAggregationStatusForAppReport() {
+ this.readLock.lock();
try {
- this.readLock.lock();
if (! logAggregationEnabled) {
return LogAggregationStatus.DISABLED;
}
@@ -2022,8 +2022,8 @@ public class RMAppImpl implements RMApp, Recoverable {
}
public String getLogAggregationFailureMessagesForNM(NodeId nodeId) {
+ this.readLock.lock();
try {
- this.readLock.lock();
List<String> failureMessages =
this.logAggregationFailureMessagesForNMs.get(nodeId);
if (failureMessages == null || failureMessages.isEmpty()) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index be1d98d..e951bb0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -1587,8 +1587,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
&& this.getFinishTime() < (end - attemptFailuresValidityInterval)) {
return false;
}
+ this.readLock.lock();
try {
- this.readLock.lock();
int exitStatus = getAMContainerExitStatus();
return !(exitStatus == ContainerExitStatus.PREEMPTED
|| exitStatus == ContainerExitStatus.ABORTED
@@ -2274,8 +2274,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
@Override
public long getFinishTime() {
+ this.readLock.lock();
try {
- this.readLock.lock();
return this.finishTime;
} finally {
this.readLock.unlock();
@@ -2283,8 +2283,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
}
private void setFinishTime(long finishTime) {
+ this.writeLock.lock();
try {
- this.writeLock.lock();
this.finishTime = finishTime;
} finally {
this.writeLock.unlock();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java
index e297ac7..77c2d64 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java
@@ -73,8 +73,8 @@ public class RMAppAttemptMetrics {
}
public void updatePreemptionInfo(Resource resource, RMContainer container) {
+ writeLock.lock();
try {
- writeLock.lock();
resourcePreempted = Resources.addTo(resourcePreempted, resource);
} finally {
writeLock.unlock();
@@ -97,8 +97,8 @@ public class RMAppAttemptMetrics {
}
public Resource getResourcePreempted() {
+ readLock.lock();
try {
- readLock.lock();
return Resource.newInstance(resourcePreempted);
} finally {
readLock.unlock();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index b6b5fe8..a251dcd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -307,8 +307,8 @@ public class RMContainerImpl implements RMContainer {
@Override
public Resource getAllocatedResource() {
+ readLock.lock();
try {
- readLock.lock();
return container.getResource();
} finally {
readLock.unlock();
@@ -317,8 +317,8 @@ public class RMContainerImpl implements RMContainer {
@Override
public Resource getLastConfirmedResource() {
+ readLock.lock();
try {
- readLock.lock();
return this.lastConfirmedResource;
} finally {
readLock.unlock();
@@ -347,8 +347,8 @@ public class RMContainerImpl implements RMContainer {
@Override
public long getFinishTime() {
+ readLock.lock();
try {
- readLock.lock();
return finishTime;
} finally {
readLock.unlock();
@@ -357,8 +357,8 @@ public class RMContainerImpl implements RMContainer {
@Override
public String getDiagnosticsInfo() {
+ readLock.lock();
try {
- readLock.lock();
if (finishedStatus != null) {
return finishedStatus.getDiagnostics();
} else {
@@ -371,8 +371,8 @@ public class RMContainerImpl implements RMContainer {
@Override
public String getLogURL() {
+ readLock.lock();
try {
- readLock.lock();
StringBuilder logURL = new StringBuilder();
logURL.append(WebAppUtils.getHttpSchemePrefix(rmContext
.getYarnConfiguration()));
@@ -387,8 +387,8 @@ public class RMContainerImpl implements RMContainer {
@Override
public int getContainerExitStatus() {
+ readLock.lock();
try {
- readLock.lock();
if (finishedStatus != null) {
return finishedStatus.getExitStatus();
} else {
@@ -401,8 +401,8 @@ public class RMContainerImpl implements RMContainer {
@Override
public ContainerState getContainerState() {
+ readLock.lock();
try {
- readLock.lock();
if (finishedStatus != null) {
return finishedStatus.getState();
} else {
@@ -415,8 +415,8 @@ public class RMContainerImpl implements RMContainer {
@Override
public ContainerRequest getContainerRequest() {
+ readLock.lock();
try {
- readLock.lock();
return containerRequestForRecovery;
} finally {
readLock.unlock();
@@ -439,8 +439,8 @@ public class RMContainerImpl implements RMContainer {
@Override
public boolean isAMContainer() {
+ readLock.lock();
try {
- readLock.lock();
return isAMContainer;
} finally {
readLock.unlock();
@@ -448,8 +448,8 @@ public class RMContainerImpl implements RMContainer {
}
public void setAMContainer(boolean isAMContainer) {
+ writeLock.lock();
try {
- writeLock.lock();
this.isAMContainer = isAMContainer;
} finally {
writeLock.unlock();
@@ -471,8 +471,9 @@ public class RMContainerImpl implements RMContainer {
LOG.debug("Processing " + event.getContainerId() + " of type " + event
.getType());
}
+
+ writeLock.lock();
try {
- writeLock.lock();
RMContainerState oldState = getState();
try {
stateMachine.doTransition(event.getType(), event);
@@ -810,8 +811,8 @@ public class RMContainerImpl implements RMContainer {
@Override
public String getNodeHttpAddress() {
+ readLock.lock();
try {
- readLock.lock();
if (container.getNodeHttpAddress() != null) {
StringBuilder httpAddress = new StringBuilder();
httpAddress.append(WebAppUtils.getHttpSchemePrefix(rmContext
@@ -894,8 +895,8 @@ public class RMContainerImpl implements RMContainer {
@Override
public Resource getAllocatedOrReservedResource() {
+ readLock.lock();
try {
- readLock.lock();
if (getState().equals(RMContainerState.RESERVED)) {
return getReservedResource();
} else {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index da03cbc..963b86d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -670,8 +670,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
public void handle(RMNodeEvent event) {
LOG.debug("Processing " + event.getNodeId() + " of type " + event.getType());
+ writeLock.lock();
try {
- writeLock.lock();
NodeState oldState = getState();
try {
stateMachine.doTransition(event.getType(), event);
@@ -1515,9 +1515,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
@Override
public List<Container> pullNewlyIncreasedContainers() {
+ writeLock.lock();
try {
- writeLock.lock();
-
if (nmReportedIncreasedContainers.isEmpty()) {
return Collections.emptyList();
} else {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractResourceUsage.java
index 664cb35..ad3bfb1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractResourceUsage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractResourceUsage.java
@@ -118,8 +118,8 @@ public class AbstractResourceUsage {
return normalize(noLabelUsages.resArr.get(type.idx));
}
+ readLock.lock();
try {
- readLock.lock();
UsageByLabel usage = usages.get(label);
if (null == usage) {
return Resources.none();
@@ -131,8 +131,8 @@ public class AbstractResourceUsage {
}
protected Resource _getAll(ResourceType type) {
+ readLock.lock();
try {
- readLock.lock();
Resource allOfType = Resources.createResource(0);
for (Map.Entry<String, UsageByLabel> usageEntry : usages.entrySet()) {
// all usages types are initialized
@@ -159,8 +159,8 @@ public class AbstractResourceUsage {
}
protected void _set(String label, ResourceType type, Resource res) {
+ writeLock.lock();
try {
- writeLock.lock();
UsageByLabel usage = getAndAddIfMissing(label);
usage.resArr.set(type.idx, res);
} finally {
@@ -169,8 +169,8 @@ public class AbstractResourceUsage {
}
protected void _inc(String label, ResourceType type, Resource res) {
+ writeLock.lock();
try {
- writeLock.lock();
UsageByLabel usage = getAndAddIfMissing(label);
usage.resArr.set(type.idx,
Resources.add(usage.resArr.get(type.idx), res));
@@ -180,8 +180,8 @@ public class AbstractResourceUsage {
}
protected void _dec(String label, ResourceType type, Resource res) {
+ writeLock.lock();
try {
- writeLock.lock();
UsageByLabel usage = getAndAddIfMissing(label);
usage.resArr.set(type.idx,
Resources.subtract(usage.resArr.get(type.idx), res));
@@ -192,8 +192,8 @@ public class AbstractResourceUsage {
@Override
public String toString() {
+ readLock.lock();
try {
- readLock.lock();
return usages.toString();
} finally {
readLock.unlock();
@@ -201,8 +201,8 @@ public class AbstractResourceUsage {
}
public Set<String> getNodePartitionsSet() {
+ readLock.lock();
try {
- readLock.lock();
return usages.keySet();
} finally {
readLock.unlock();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 098457c..7e22fd5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -343,8 +343,8 @@ public abstract class AbstractYarnScheduler
protected void containerLaunchedOnNode(
ContainerId containerId, SchedulerNode node) {
+ readLock.lock();
try {
- readLock.lock();
// Get the application for the finished container
SchedulerApplicationAttempt application =
getCurrentAttemptForContainer(containerId);
@@ -485,8 +485,8 @@ public abstract class AbstractYarnScheduler
public void recoverContainersOnNode(List<NMContainerStatus> containerReports,
RMNode nm) {
+ writeLock.lock();
try {
- writeLock.lock();
if (!rmContext.isWorkPreservingRecoveryEnabled()
|| containerReports == null || (containerReports != null
&& containerReports.isEmpty())) {
@@ -767,8 +767,8 @@ public abstract class AbstractYarnScheduler
@Override
public void moveAllApps(String sourceQueue, String destQueue)
throws YarnException {
+ writeLock.lock();
try {
- writeLock.lock();
// check if destination queue is a valid leaf queue
try {
getQueueInfo(destQueue, false, false);
@@ -798,8 +798,8 @@ public abstract class AbstractYarnScheduler
@Override
public void killAllAppsInQueue(String queueName)
throws YarnException {
+ writeLock.lock();
try {
- writeLock.lock();
// check if queue is a valid
List<ApplicationAttemptId> apps = getAppsInQueue(queueName);
if (apps == null) {
@@ -824,8 +824,8 @@ public abstract class AbstractYarnScheduler
*/
public void updateNodeResource(RMNode nm,
ResourceOption resourceOption) {
+ writeLock.lock();
try {
- writeLock.lock();
SchedulerNode node = getSchedulerNode(nm.getNodeID());
Resource newResource = resourceOption.getResource();
Resource oldResource = node.getTotalResource();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index e10b9b0..bd6e7ca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -135,8 +135,8 @@ public class AppSchedulingInfo {
}
public String getQueueName() {
+ this.readLock.lock();
try {
- this.readLock.lock();
return queue.getQueueName();
} finally {
this.readLock.unlock();
@@ -465,8 +465,8 @@ public class AppSchedulingInfo {
*/
public List<ResourceRequest> getAllResourceRequests() {
List<ResourceRequest> ret = new ArrayList<>();
+ this.readLock.lock();
try {
- this.readLock.lock();
for (AppPlacementAllocator ap : schedulerKeyToAppPlacementAllocator
.values()) {
ret.addAll(ap.getResourceRequests().values());
@@ -483,8 +483,8 @@ public class AppSchedulingInfo {
*/
public List<SchedulingRequest> getAllSchedulingRequests() {
List<SchedulingRequest> ret = new ArrayList<>();
+ this.readLock.lock();
try {
- this.readLock.lock();
schedulerKeyToAppPlacementAllocator.values().stream()
.filter(ap -> ap.getSchedulingRequest() != null)
.forEach(ap -> ret.add(ap.getSchedulingRequest()));
@@ -495,8 +495,8 @@ public class AppSchedulingInfo {
}
public PendingAsk getNextPendingAsk() {
+ readLock.lock();
try {
- readLock.lock();
SchedulerRequestKey firstRequestKey = schedulerKeys.first();
return getPendingAsk(firstRequestKey, ResourceRequest.ANY);
} finally {
@@ -511,8 +511,8 @@ public class AppSchedulingInfo {
public PendingAsk getPendingAsk(SchedulerRequestKey schedulerKey,
String resourceName) {
+ this.readLock.lock();
try {
- this.readLock.lock();
AppPlacementAllocator ap = schedulerKeyToAppPlacementAllocator.get(
schedulerKey);
return (ap == null) ? PendingAsk.ZERO : ap.getPendingAsk(resourceName);
@@ -547,9 +547,8 @@ public class AppSchedulingInfo {
public ContainerRequest allocate(NodeType type,
SchedulerNode node, SchedulerRequestKey schedulerKey,
Container containerAllocated) {
+ writeLock.lock();
try {
- writeLock.lock();
-
if (null != containerAllocated) {
updateMetricsForAllocatedContainer(type, node, containerAllocated);
}
@@ -568,8 +567,8 @@ public class AppSchedulingInfo {
}
public void move(Queue newQueue) {
+ this.writeLock.lock();
try {
- this.writeLock.lock();
QueueMetrics oldMetrics = queue.getMetrics();
QueueMetrics newMetrics = newQueue.getMetrics();
for (AppPlacementAllocator ap : schedulerKeyToAppPlacementAllocator
@@ -607,8 +606,8 @@ public class AppSchedulingInfo {
public void stop() {
// clear pending resources metrics for the application
+ this.writeLock.lock();
try {
- this.writeLock.lock();
QueueMetrics metrics = queue.getMetrics();
for (AppPlacementAllocator ap : schedulerKeyToAppPlacementAllocator
.values()) {
@@ -634,8 +633,8 @@ public class AppSchedulingInfo {
}
public void setQueue(Queue queue) {
+ this.writeLock.lock();
try {
- this.writeLock.lock();
this.queue = queue;
} finally {
this.writeLock.unlock();
@@ -663,8 +662,8 @@ public class AppSchedulingInfo {
if (rmContainer.getExecutionType() != ExecutionType.GUARANTEED) {
return;
}
+ this.writeLock.lock();
try {
- this.writeLock.lock();
QueueMetrics metrics = queue.getMetrics();
if (pending) {
// If there was any container to recover, the application was
@@ -691,8 +690,8 @@ public class AppSchedulingInfo {
*/
public boolean checkAllocation(NodeType type, SchedulerNode node,
SchedulerRequestKey schedulerKey) {
+ readLock.lock();
try {
- readLock.lock();
AppPlacementAllocator ap = schedulerKeyToAppPlacementAllocator.get(
schedulerKey);
if (null == ap) {
@@ -752,8 +751,8 @@ public class AppSchedulingInfo {
*/
public boolean canDelayTo(
SchedulerRequestKey schedulerKey, String resourceName) {
+ this.readLock.lock();
try {
- this.readLock.lock();
AppPlacementAllocator ap =
schedulerKeyToAppPlacementAllocator.get(schedulerKey);
return (ap == null) || ap.canDelayTo(resourceName);
@@ -773,8 +772,8 @@ public class AppSchedulingInfo {
*/
public boolean precheckNode(SchedulerRequestKey schedulerKey,
SchedulerNode schedulerNode, SchedulingMode schedulingMode) {
+ this.readLock.lock();
try {
- this.readLock.lock();
AppPlacementAllocator ap =
schedulerKeyToAppPlacementAllocator.get(schedulerKey);
return (ap != null) && ap.precheckNode(schedulerNode,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
index 37958de..c46c911 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
@@ -73,8 +73,8 @@ public class ResourceUsage extends AbstractResourceUsage {
}
public void copyAllUsed(AbstractResourceUsage other) {
+ writeLock.lock();
try {
- writeLock.lock();
for (Entry<String, UsageByLabel> entry : other.usages.entrySet()) {
setUsed(entry.getKey(), Resources.clone(entry.getValue().getUsed()));
}
@@ -285,8 +285,8 @@ public class ResourceUsage extends AbstractResourceUsage {
}
public Resource getCachedDemand(String label) {
+ readLock.lock();
try {
- readLock.lock();
Resource demand = Resources.createResource(0);
Resources.addTo(demand, getCachedUsed(label));
Resources.addTo(demand, getCachedPending(label));
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 8aa660d..2e8a7c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -253,8 +253,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
* @return live containers of the application
*/
public Collection<RMContainer> getLiveContainers() {
+ readLock.lock();
try {
- readLock.lock();
return new ArrayList<>(liveContainers.values());
} finally {
readLock.unlock();
@@ -307,8 +307,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
public PendingAsk getPendingAsk(
SchedulerRequestKey schedulerKey, String resourceName) {
+ readLock.lock();
try {
- readLock.lock();
return appSchedulingInfo.getPendingAsk(schedulerKey, resourceName);
} finally {
readLock.unlock();
@@ -321,8 +321,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
public int getOutstandingAsksCount(SchedulerRequestKey schedulerKey,
String resourceName) {
+ readLock.lock();
try {
- readLock.lock();
AppPlacementAllocator ap = appSchedulingInfo.getAppPlacementAllocator(
schedulerKey);
return ap == null ? 0 : ap.getOutstandingAsksCount(resourceName);
@@ -369,8 +369,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
public void addRMContainer(
ContainerId id, RMContainer rmContainer) {
+ writeLock.lock();
try {
- writeLock.lock();
if (!getApplicationAttemptId().equals(
rmContainer.getApplicationAttemptId()) &&
!liveContainers.containsKey(id)) {
@@ -393,8 +393,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
}
public void removeRMContainer(ContainerId containerId) {
+ writeLock.lock();
try {
- writeLock.lock();
RMContainer rmContainer = liveContainers.remove(containerId);
if (rmContainer != null) {
if (rmContainer.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
@@ -446,8 +446,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
public boolean updateResourceRequests(
List<ResourceRequest> requests) {
+ writeLock.lock();
try {
- writeLock.lock();
if (!isStopped) {
return appSchedulingInfo.updateResourceRequests(requests, false);
}
@@ -463,8 +463,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
return false;
}
+ writeLock.lock();
try {
- writeLock.lock();
if (!isStopped) {
return appSchedulingInfo.updateSchedulingRequests(requests, false);
}
@@ -476,8 +476,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
public void recoverResourceRequestsForContainer(
ContainerRequest containerRequest) {
+ writeLock.lock();
try {
- writeLock.lock();
if (!isStopped) {
appSchedulingInfo.updateResourceRequests(
containerRequest.getResourceRequests(), true);
@@ -488,8 +488,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
}
public void stop(RMAppAttemptState rmAppAttemptFinalState) {
+ writeLock.lock();
try {
- writeLock.lock();
// Cleanup all scheduling information
isStopped = true;
appSchedulingInfo.stop();
@@ -508,8 +508,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
*/
public List<RMContainer> getReservedContainers() {
List<RMContainer> list = new ArrayList<>();
+ readLock.lock();
try {
- readLock.lock();
for (Entry<SchedulerRequestKey, Map<NodeId, RMContainer>> e :
this.reservedContainers.entrySet()) {
list.addAll(e.getValue().values());
@@ -524,8 +524,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
public boolean reserveIncreasedContainer(SchedulerNode node,
SchedulerRequestKey schedulerKey, RMContainer rmContainer,
Resource reservedResource) {
+ writeLock.lock();
try {
- writeLock.lock();
if (commonReserve(node, schedulerKey, rmContainer, reservedResource)) {
attemptResourceUsage.incReserved(node.getPartition(), reservedResource);
// succeeded
@@ -573,8 +573,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
public RMContainer reserve(SchedulerNode node,
SchedulerRequestKey schedulerKey, RMContainer rmContainer,
Container container) {
+ writeLock.lock();
try {
- writeLock.lock();
// Create RMContainer if necessary
if (rmContainer == null) {
rmContainer = new RMContainerImpl(container, schedulerKey,
@@ -617,8 +617,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
public int getNumReservedContainers(
SchedulerRequestKey schedulerKey) {
+ readLock.lock();
try {
- readLock.lock();
Map<NodeId, RMContainer> map = this.reservedContainers.get(
schedulerKey);
return (map == null) ? 0 : map.size();
@@ -630,8 +630,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
@SuppressWarnings("unchecked")
public void containerLaunchedOnNode(ContainerId containerId,
NodeId nodeId) {
+ writeLock.lock();
try {
- writeLock.lock();
// Inform the container
RMContainer rmContainer = getRMContainer(containerId);
if (rmContainer == null) {
@@ -650,8 +650,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
public void showRequests() {
if (LOG.isDebugEnabled()) {
+ readLock.lock();
try {
- readLock.lock();
for (SchedulerRequestKey schedulerKey : getSchedulerKeys()) {
AppPlacementAllocator ap = getAppPlacementAllocator(schedulerKey);
if (ap != null &&
@@ -762,8 +762,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
* </code>.
*/
List<RMContainer> pullContainersToTransfer() {
+ writeLock.lock();
try {
- writeLock.lock();
recoveredPreviousAttemptContainers.clear();
return new ArrayList<>(liveContainers.values());
} finally {
@@ -777,8 +777,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
* <code>AllocateResponse#containersFromPreviousAttempts</code>.
*/
public List<Container> pullPreviousAttemptContainers() {
+ writeLock.lock();
try {
- writeLock.lock();
if (recoveredPreviousAttemptContainers.isEmpty()) {
return null;
}
@@ -796,8 +796,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
// some reason like DNS unavailable, do not return this container and keep it
// in the newlyAllocatedContainers waiting to be refetched.
public List<Container> pullNewlyAllocatedContainers() {
+ writeLock.lock();
try {
- writeLock.lock();
List<Container> returnContainerList = new ArrayList<Container>(
newlyAllocatedContainers.size());
@@ -912,8 +912,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|| ContainerUpdateType.PROMOTE_EXECUTION_TYPE == updateTpe)) {
return updatedContainers;
}
+
+ writeLock.lock();
try {
- writeLock.lock();
Iterator<Map.Entry<ContainerId, RMContainer>> i =
newlyUpdatedContainers.entrySet().iterator();
while (i.hasNext()) {
@@ -960,8 +961,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
}
public List<NMToken> pullUpdatedNMTokens() {
+ writeLock.lock();
try {
- writeLock.lock();
List <NMToken> returnList = new ArrayList<>(updatedNMTokens);
updatedNMTokens.clear();
return returnList;
@@ -979,8 +980,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
public void updateBlacklist(List<String> blacklistAdditions,
List<String> blacklistRemovals) {
+ writeLock.lock();
try {
- writeLock.lock();
if (!isStopped) {
if (isWaitingForAMContainer()) {
// The request is for the AM-container, and the AM-container is
@@ -999,8 +1000,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
}
public boolean isPlaceBlacklisted(String resourceName) {
+ readLock.lock();
try {
- readLock.lock();
boolean forAMContainer = isWaitingForAMContainer();
return this.appSchedulingInfo.isPlaceBlacklisted(resourceName,
forAMContainer);
@@ -1103,8 +1104,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
}
public ApplicationResourceUsageReport getResourceUsageReport() {
+ writeLock.lock();
try {
- writeLock.lock();
AggregateAppResourceUsage runningResourceUsage =
getRunningAggregateAppResourceUsage();
Resource usedResourceClone = Resources.clone(
@@ -1154,8 +1155,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
public void transferStateFromPreviousAttempt(
SchedulerApplicationAttempt appAttempt) {
+ writeLock.lock();
try {
- writeLock.lock();
this.liveContainers = appAttempt.getLiveContainersMap();
// this.reReservations = appAttempt.reReservations;
this.attemptResourceUsage.copyAllUsed(appAttempt.attemptResourceUsage);
@@ -1172,8 +1173,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
}
public void move(Queue newQueue) {
+ writeLock.lock();
try {
- writeLock.lock();
QueueMetrics oldMetrics = queue.getMetrics();
QueueMetrics newMetrics = newQueue.getMetrics();
String newQueueName = newQueue.getQueueName();
@@ -1209,8 +1210,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
public void recoverContainer(SchedulerNode node,
RMContainer rmContainer) {
+ writeLock.lock();
try {
- writeLock.lock();
// recover app scheduling info
appSchedulingInfo.recoverContainer(rmContainer, node.getPartition());
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java
index ac97d72..f351119 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java
@@ -77,8 +77,8 @@ public class AbstractAutoCreatedLeafQueue extends LeafQueue {
*/
public void setEntitlement(String nodeLabel, QueueEntitlement entitlement)
throws SchedulerDynamicEditException {
+ writeLock.lock();
try {
- writeLock.lock();
float capacity = entitlement.getCapacity();
if (capacity < 0 || capacity > 1.0f) {
throw new SchedulerDynamicEditException(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 5401383..358eade 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -279,8 +279,8 @@ public abstract class AbstractCSQueue implements CSQueue {
* @param maximumCapacity new max capacity
*/
void setMaxCapacity(float maximumCapacity) {
+ writeLock.lock();
try {
- writeLock.lock();
// Sanity check
CSQueueUtils.checkMaxCapacity(getQueueName(),
queueCapacities.getCapacity(), maximumCapacity);
@@ -301,8 +301,8 @@ public abstract class AbstractCSQueue implements CSQueue {
* @param maximumCapacity new max capacity
*/
void setMaxCapacity(String nodeLabel, float maximumCapacity) {
+ writeLock.lock();
try {
- writeLock.lock();
// Sanity check
CSQueueUtils.checkMaxCapacity(getQueueName(),
queueCapacities.getCapacity(nodeLabel), maximumCapacity);
@@ -333,8 +333,8 @@ public abstract class AbstractCSQueue implements CSQueue {
CapacitySchedulerConfiguration configuration) throws
IOException {
+ writeLock.lock();
try {
- writeLock.lock();
// get labels
this.accessibleLabels =
configuration.getAccessibleNodeLabels(getQueuePath());
@@ -750,8 +750,8 @@ public abstract class AbstractCSQueue implements CSQueue {
void allocateResource(Resource clusterResource,
Resource resource, String nodePartition) {
+ writeLock.lock();
try {
- writeLock.lock();
queueUsage.incUsed(nodePartition, resource);
++numContainers;
@@ -765,8 +765,8 @@ public abstract class AbstractCSQueue implements CSQueue {
protected void releaseResource(Resource clusterResource,
Resource resource, String nodePartition) {
+ writeLock.lock();
try {
- writeLock.lock();
queueUsage.decUsed(nodePartition, resource);
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
@@ -785,8 +785,8 @@ public abstract class AbstractCSQueue implements CSQueue {
@Private
public Map<AccessType, AccessControlList> getACLs() {
+ readLock.lock();
try {
- readLock.lock();
return acls;
} finally {
readLock.unlock();
@@ -938,8 +938,8 @@ public abstract class AbstractCSQueue implements CSQueue {
boolean canAssignToThisQueue(Resource clusterResource,
String nodePartition, ResourceLimits currentResourceLimits,
Resource resourceCouldBeUnreserved, SchedulingMode schedulingMode) {
+ readLock.lock();
try {
- readLock.lock();
// Get current limited resource:
// - When doing RESPECT_PARTITION_EXCLUSIVITY allocation, we will respect
// queues' max capacity.
@@ -1203,9 +1203,8 @@ public abstract class AbstractCSQueue implements CSQueue {
Resource netAllocated = Resources.subtract(required,
request.getTotalReleasedResource());
+ readLock.lock();
try {
- readLock.lock();
-
String partition = schedulerContainer.getNodePartition();
Resource maxResourceLimit;
if (allocation.getSchedulingMode()
@@ -1254,8 +1253,8 @@ public abstract class AbstractCSQueue implements CSQueue {
@Override
public void activeQueue() throws YarnException {
+ this.writeLock.lock();
try {
- this.writeLock.lock();
if (getState() == QueueState.RUNNING) {
LOG.info("The specified queue:" + queueName
+ " is already in the RUNNING state.");
@@ -1278,8 +1277,8 @@ public abstract class AbstractCSQueue implements CSQueue {
}
protected void appFinished() {
+ this.writeLock.lock();
try {
- this.writeLock.lock();
if (getState() == QueueState.DRAINING) {
if (getNumApplications() == 0) {
updateQueueState(QueueState.STOPPED);
@@ -1301,8 +1300,8 @@ public abstract class AbstractCSQueue implements CSQueue {
}
public void recoverDrainingState() {
+ this.writeLock.lock();
try {
- this.writeLock.lock();
if (getState() == QueueState.STOPPED) {
updateQueueState(QueueState.DRAINING);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java
index 9d38f79..eb3221e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java
@@ -54,9 +54,8 @@ public abstract class AbstractManagedParentQueue extends ParentQueue {
@Override
public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
throws IOException {
+ writeLock.lock();
try {
- writeLock.lock();
-
// Set new configs
setupQueueConfigs(clusterResource);
@@ -72,8 +71,8 @@ public abstract class AbstractManagedParentQueue extends ParentQueue {
*/
public void addChildQueue(CSQueue childQueue)
throws SchedulerDynamicEditException, IOException {
+ writeLock.lock();
try {
- writeLock.lock();
if (childQueue.getCapacity() > 0) {
throw new SchedulerDynamicEditException(
"Queue " + childQueue + " being added has non zero capacity.");
@@ -95,8 +94,8 @@ public abstract class AbstractManagedParentQueue extends ParentQueue {
*/
public void removeChildQueue(CSQueue childQueue)
throws SchedulerDynamicEditException {
+ writeLock.lock();
try {
- writeLock.lock();
if (childQueue.getCapacity() > 0) {
throw new SchedulerDynamicEditException(
"Queue " + childQueue + " being removed has non zero capacity.");
@@ -124,8 +123,8 @@ public abstract class AbstractManagedParentQueue extends ParentQueue {
public CSQueue removeChildQueue(String childQueueName)
throws SchedulerDynamicEditException {
CSQueue childQueue;
+ writeLock.lock();
try {
- writeLock.lock();
childQueue = this.csContext.getCapacitySchedulerQueueManager().getQueue(
childQueueName);
if (childQueue != null) {
@@ -141,8 +140,8 @@ public abstract class AbstractManagedParentQueue extends ParentQueue {
}
protected float sumOfChildCapacities() {
+ writeLock.lock();
try {
- writeLock.lock();
float ret = 0;
for (CSQueue l : childQueues) {
ret += l.getCapacity();
@@ -154,8 +153,8 @@ public abstract class AbstractManagedParentQueue extends ParentQueue {
}
protected float sumOfChildAbsCapacities() {
+ writeLock.lock();
try {
- writeLock.lock();
float ret = 0;
for (CSQueue l : childQueues) {
ret += l.getAbsoluteCapacity();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java
index e12b55e..b194ad8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java
@@ -49,9 +49,8 @@ public class AutoCreatedLeafQueue extends AbstractAutoCreatedLeafQueue {
@Override
public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
throws IOException {
+ writeLock.lock();
try {
- writeLock.lock();
-
validate(newlyParsedQueue);
ManagedParentQueue managedParentQueue = (ManagedParentQueue) parent;
@@ -72,8 +71,8 @@ public class AutoCreatedLeafQueue extends AbstractAutoCreatedLeafQueue {
public void reinitializeFromTemplate(AutoCreatedLeafQueueConfig
leafQueueTemplate) throws SchedulerDynamicEditException, IOException {
+ writeLock.lock();
try {
- writeLock.lock();
// TODO:
// reinitialize only capacities for now since 0 capacity updates
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 717d0a3..7b9a13c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -322,8 +322,8 @@ public class CapacityScheduler extends
@VisibleForTesting
void initScheduler(Configuration configuration) throws
IOException {
+ writeLock.lock();
try {
- writeLock.lock();
String confProviderStr = configuration.get(
YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,
YarnConfiguration.DEFAULT_CONFIGURATION_STORE);
@@ -421,8 +421,8 @@ public class CapacityScheduler extends
}
private void startSchedulerThreads() {
+ writeLock.lock();
try {
- writeLock.lock();
activitiesManager.start();
if (scheduleAsynchronously) {
Preconditions.checkNotNull(asyncSchedulerThreads,
@@ -455,8 +455,8 @@ public class CapacityScheduler extends
@Override
public void serviceStop() throws Exception {
+ writeLock.lock();
try {
- writeLock.lock();
this.activitiesManager.stop();
if (scheduleAsynchronously && asyncSchedulerThreads != null) {
for (Thread t : asyncSchedulerThreads) {
@@ -479,8 +479,8 @@ public class CapacityScheduler extends
@Override
public void reinitialize(Configuration newConf, RMContext rmContext)
throws IOException {
+ writeLock.lock();
try {
- writeLock.lock();
Configuration configuration = new Configuration(newConf);
CapacitySchedulerConfiguration oldConf = this.conf;
this.conf = csConfProvider.loadConfiguration(configuration);
@@ -656,9 +656,8 @@ public class CapacityScheduler extends
try {
ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request =
backlogs.take();
-
+ cs.writeLock.lock();
try {
- cs.writeLock.lock();
cs.tryCommit(cs.getClusterResource(), request, true);
} finally {
cs.writeLock.unlock();
@@ -684,8 +683,8 @@ public class CapacityScheduler extends
@VisibleForTesting
public PlacementRule getUserGroupMappingPlacementRule() throws IOException {
+ readLock.lock();
try {
- readLock.lock();
UserGroupMappingPlacementRule ugRule = new UserGroupMappingPlacementRule();
ugRule.initialize(this);
return ugRule;
@@ -695,8 +694,8 @@ public class CapacityScheduler extends
}
public PlacementRule getAppNameMappingPlacementRule() throws IOException {
+ readLock.lock();
try {
- readLock.lock();
AppNameMappingPlacementRule anRule = new AppNameMappingPlacementRule();
anRule.initialize(this);
return anRule;
@@ -796,8 +795,8 @@ public class CapacityScheduler extends
private void addApplicationOnRecovery(ApplicationId applicationId,
String queueName, String user,
Priority priority, ApplicationPlacementContext placementContext) {
+ writeLock.lock();
try {
- writeLock.lock();
//check if the queue needs to be auto-created during recovery
CSQueue queue = getOrCreateQueueFromPlacementContext(applicationId, user,
queueName, placementContext, true);
@@ -920,8 +919,8 @@ public class CapacityScheduler extends
private void addApplication(ApplicationId applicationId, String queueName,
String user, Priority priority,
ApplicationPlacementContext placementContext) {
+ writeLock.lock();
try {
- writeLock.lock();
if (isSystemAppsLimitReached()) {
String message = "Maximum system application limit reached,"
+ "cannot accept submission of application: " + applicationId;
@@ -1019,8 +1018,8 @@ public class CapacityScheduler extends
ApplicationAttemptId applicationAttemptId,
boolean transferStateFromPreviousAttempt,
boolean isAttemptRecovering) {
+ writeLock.lock();
try {
- writeLock.lock();
SchedulerApplication<FiCaSchedulerApp> application = applications.get(
applicationAttemptId.getApplicationId());
if (application == null) {
@@ -1072,8 +1071,8 @@ public class CapacityScheduler extends
private void doneApplication(ApplicationId applicationId,
RMAppState finalState) {
+ writeLock.lock();
try {
- writeLock.lock();
SchedulerApplication<FiCaSchedulerApp> application = applications.get(
applicationId);
if (application == null) {
@@ -1099,8 +1098,8 @@ public class CapacityScheduler extends
private void doneApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) {
+ writeLock.lock();
try {
- writeLock.lock();
LOG.info("Application Attempt " + applicationAttemptId + " is done."
+ " finalState=" + rmAppAttemptFinalState);
@@ -1214,8 +1213,8 @@ public class CapacityScheduler extends
// make sure we aren't stopping/removing the application
// when the allocate comes in
+ application.getWriteLock().lock();
try {
- application.getWriteLock().lock();
if (application.isStopped()) {
return EMPTY_ALLOCATION;
}
@@ -1292,8 +1291,8 @@ public class CapacityScheduler extends
@Override
protected void nodeUpdate(RMNode rmNode) {
long begin = System.nanoTime();
+ readLock.lock();
try {
- readLock.lock();
setLastNodeUpdateTime(Time.now());
super.nodeUpdate(rmNode);
} finally {
@@ -1302,8 +1301,8 @@ public class CapacityScheduler extends
// Try to do scheduling
if (!scheduleAsynchronously) {
+ writeLock.lock();
try {
- writeLock.lock();
ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager,
rmNode.getNodeID());
@@ -1329,8 +1328,8 @@ public class CapacityScheduler extends
*/
private void updateNodeAndQueueResource(RMNode nm,
ResourceOption resourceOption) {
+ writeLock.lock();
try {
- writeLock.lock();
updateNodeResource(nm, resourceOption);
Resource clusterResource = getClusterResource();
getRootQueue().updateClusterResource(clusterResource,
@@ -1917,8 +1916,8 @@ public class CapacityScheduler extends
private void updateNodeAttributes(
NodeAttributesUpdateSchedulerEvent attributeUpdateEvent) {
+ writeLock.lock();
try {
- writeLock.lock();
for (Entry<String, Set<NodeAttribute>> entry : attributeUpdateEvent
.getUpdatedNodeToAttributes().entrySet()) {
String hostname = entry.getKey();
@@ -1944,8 +1943,8 @@ public class CapacityScheduler extends
*/
private void updateNodeLabelsAndQueueResource(
NodeLabelsUpdateSchedulerEvent labelUpdateEvent) {
+ writeLock.lock();
try {
- writeLock.lock();
Set<String> updateLabels = new HashSet<String>();
for (Entry<NodeId, Set<String>> entry : labelUpdateEvent
.getUpdatedNodeToLabels().entrySet()) {
@@ -1982,8 +1981,8 @@ public class CapacityScheduler extends
}
private void addNode(RMNode nodeManager) {
+ writeLock.lock();
try {
- writeLock.lock();
FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager,
usePortForNodeName, nodeManager.getNodeLabels());
nodeTracker.addNode(schedulerNode);
@@ -2019,8 +2018,8 @@ public class CapacityScheduler extends
}
private void removeNode(RMNode nodeInfo) {
+ writeLock.lock();
try {
- writeLock.lock();
// update this node to node label manager
if (labelManager != null) {
labelManager.deactivateNode(nodeInfo.getNodeID());
@@ -2164,8 +2163,8 @@ public class CapacityScheduler extends
public void markContainerForKillable(
RMContainer killableContainer) {
+ writeLock.lock();
try {
- writeLock.lock();
if (LOG.isDebugEnabled()) {
LOG.debug(SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE + ": container"
+ killableContainer.toString());
@@ -2200,8 +2199,8 @@ public class CapacityScheduler extends
private void markContainerForNonKillable(
RMContainer nonKillableContainer) {
+ writeLock.lock();
try {
- writeLock.lock();
if (LOG.isDebugEnabled()) {
LOG.debug(
SchedulerEventType.MARK_CONTAINER_FOR_NONKILLABLE + ": container"
@@ -2269,8 +2268,8 @@ public class CapacityScheduler extends
private String resolveReservationQueueName(String queueName,
ApplicationId applicationId, ReservationId reservationID,
boolean isRecovering) {
+ readLock.lock();
try {
- readLock.lock();
CSQueue queue = getQueue(queueName);
// Check if the queue is a plan queue
if ((queue == null) || !(queue instanceof PlanQueue)) {
@@ -2320,8 +2319,8 @@ public class CapacityScheduler extends
@Override
public void removeQueue(String queueName)
throws SchedulerDynamicEditException {
+ writeLock.lock();
try {
- writeLock.lock();
LOG.info("Removing queue: " + queueName);
CSQueue q = this.getQueue(queueName);
if (!(AbstractAutoCreatedLeafQueue.class.isAssignableFrom(
@@ -2354,8 +2353,8 @@ public class CapacityScheduler extends
@Override
public void addQueue(Queue queue)
throws SchedulerDynamicEditException, IOException {
+ writeLock.lock();
try {
- writeLock.lock();
if (queue == null) {
throw new SchedulerDynamicEditException(
"Queue specified is null. Should be an implementation of "
@@ -2392,8 +2391,8 @@ public class CapacityScheduler extends
@Override
public void setEntitlement(String inQueue, QueueEntitlement entitlement)
throws YarnException {
+ writeLock.lock();
try {
- writeLock.lock();
LeafQueue queue = this.queueManager.getAndCheckLeafQueue(inQueue);
AbstractManagedParentQueue parent =
(AbstractManagedParentQueue) queue.getParent();
@@ -2429,8 +2428,8 @@ public class CapacityScheduler extends
@Override
public String moveApplication(ApplicationId appId,
String targetQueueName) throws YarnException {
+ writeLock.lock();
try {
- writeLock.lock();
SchedulerApplication<FiCaSchedulerApp> application =
applications.get(appId);
if (application == null) {
@@ -2481,8 +2480,8 @@ public class CapacityScheduler extends
@Override
public void preValidateMoveApplication(ApplicationId appId,
String newQueue) throws YarnException {
+ writeLock.lock();
try {
- writeLock.lock();
SchedulerApplication<FiCaSchedulerApp> application =
applications.get(appId);
if (application == null) {
@@ -2604,8 +2603,8 @@ public class CapacityScheduler extends
public Priority checkAndGetApplicationPriority(
Priority priorityRequestedByApp, UserGroupInformation user,
String queueName, ApplicationId applicationId) throws YarnException {
+ readLock.lock();
try {
- readLock.lock();
Priority appPriority = priorityRequestedByApp;
// Verify the scenario where priority is null from submissionContext.
@@ -2660,8 +2659,8 @@ public class CapacityScheduler extends
ApplicationId applicationId, SettableFuture<Object> future,
UserGroupInformation user)
throws YarnException {
+ writeLock.lock();
try {
- writeLock.lock();
Priority appPriority = null;
SchedulerApplication<FiCaSchedulerApp> application = applications
.get(applicationId);
@@ -3065,9 +3064,8 @@ public class CapacityScheduler extends
*/
public boolean moveReservedContainer(RMContainer toBeMovedContainer,
FiCaSchedulerNode targetNode) {
+ writeLock.lock();
try {
- writeLock.lock();
-
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to move container=" + toBeMovedContainer + " to node="
+ targetNode.getNodeID());
@@ -3121,8 +3119,8 @@ public class CapacityScheduler extends
@Override
public long checkAndGetApplicationLifetime(String queueName,
long lifetimeRequestedByApp) {
+ readLock.lock();
try {
- readLock.lock();
CSQueue queue = getQueue(queueName);
if (queue == null || !(queue instanceof LeafQueue)) {
return lifetimeRequestedByApp;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index be2784b..19a5647 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -169,8 +169,8 @@ public class LeafQueue extends AbstractCSQueue {
protected void setupQueueConfigs(Resource clusterResource,
CapacitySchedulerConfiguration conf) throws
IOException {
+ writeLock.lock();
try {
- writeLock.lock();
CapacitySchedulerConfiguration schedConf = csContext.getConfiguration();
super.setupQueueConfigs(clusterResource, conf);
@@ -402,8 +402,8 @@ public class LeafQueue extends AbstractCSQueue {
@Override
public int getNumApplications() {
+ readLock.lock();
try {
- readLock.lock();
return getNumPendingApplications() + getNumActiveApplications();
} finally {
readLock.unlock();
@@ -411,8 +411,8 @@ public class LeafQueue extends AbstractCSQueue {
}
public int getNumPendingApplications() {
+ readLock.lock();
try {
- readLock.lock();
return pendingOrderingPolicy.getNumSchedulableEntities();
} finally {
readLock.unlock();
@@ -420,8 +420,8 @@ public class LeafQueue extends AbstractCSQueue {
}
public int getNumActiveApplications() {
+ readLock.lock();
try {
- readLock.lock();
return orderingPolicy.getNumSchedulableEntities();
} finally {
readLock.unlock();
@@ -430,8 +430,8 @@ public class LeafQueue extends AbstractCSQueue {
@Private
public int getNumPendingApplications(String user) {
+ readLock.lock();
try {
- readLock.lock();
User u = getUser(user);
if (null == u) {
return 0;
@@ -444,8 +444,8 @@ public class LeafQueue extends AbstractCSQueue {
@Private
public int getNumActiveApplications(String user) {
+ readLock.lock();
try {
- readLock.lock();
User u = getUser(user);
if (null == u) {
return 0;
@@ -476,8 +476,8 @@ public class LeafQueue extends AbstractCSQueue {
@Override
public List<QueueUserACLInfo>
getQueueUserAclInfo(UserGroupInformation user) {
+ readLock.lock();
try {
- readLock.lock();
QueueUserACLInfo userAclInfo = recordFactory.newRecordInstance(
QueueUserACLInfo.class);
List<QueueACL> operations = new ArrayList<>();
@@ -497,8 +497,8 @@ public class LeafQueue extends AbstractCSQueue {
}
public String toString() {
+ readLock.lock();
try {
- readLock.lock();
return queueName + ": " + "capacity=" + queueCapacities.getCapacity()
+ ", " + "absoluteCapacity=" + queueCapacities.getAbsoluteCapacity()
+ ", " + "usedResources=" + queueUsage.getUsed() + ", "
@@ -522,8 +522,8 @@ public class LeafQueue extends AbstractCSQueue {
@Private
public List<AppPriorityACLGroup> getPriorityACLs() {
+ readLock.lock();
try {
- readLock.lock();
return new ArrayList<>(priorityAcls);
} finally {
readLock.unlock();
@@ -535,8 +535,8 @@ public class LeafQueue extends AbstractCSQueue {
CapacitySchedulerConfiguration configuration) throws
IOException {
+ writeLock.lock();
try {
- writeLock.lock();
// Sanity check
if (!(newlyParsedQueue instanceof LeafQueue) || !newlyParsedQueue
.getQueuePath().equals(getQueuePath())) {
@@ -582,9 +582,8 @@ public class LeafQueue extends AbstractCSQueue {
public void submitApplicationAttempt(FiCaSchedulerApp application,
String userName) {
// Careful! Locking order is important!
+ writeLock.lock();
try {
- writeLock.lock();
-
// TODO, should use getUser, use this method just to avoid UT failure
// which is caused by wrong invoking order, will fix UT separately
User user = usersManager.getUserAndAddIfAbsent(userName);
@@ -622,8 +621,8 @@ public class LeafQueue extends AbstractCSQueue {
public void validateSubmitApplication(ApplicationId applicationId,
String userName, String queue) throws AccessControlException {
+ writeLock.lock();
try {
- writeLock.lock();
// Check if the queue is accepting jobs
if (getState() != QueueState.RUNNING) {
String msg = "Queue " + getQueuePath()
@@ -691,8 +690,9 @@ public class LeafQueue extends AbstractCSQueue {
if (userName != null && getUser(userName) != null) {
userWeight = getUser(userName).getWeight();
}
+
+ readLock.lock();
try {
- readLock.lock();
/*
* The user am resource limit is based on the same approach as the user
* limit (as it should represent a subset of that). This means that it uses
@@ -741,8 +741,8 @@ public class LeafQueue extends AbstractCSQueue {
public Resource calculateAndGetAMResourceLimitPerPartition(
String nodePartition) {
+ writeLock.lock();
try {
- writeLock.lock();
/*
* For non-labeled partition, get the max value from resources currently
* available to the queue and the absolute resources guaranteed for the
@@ -794,8 +794,8 @@ public class LeafQueue extends AbstractCSQueue {
}
protected void activateApplications() {
+ writeLock.lock();
try {
- writeLock.lock();
// limit of allowed resource usage for application masters
Map<String, Resource> userAmPartitionLimit =
new HashMap<String, Resource>();
@@ -916,8 +916,8 @@ public class LeafQueue extends AbstractCSQueue {
private void addApplicationAttempt(FiCaSchedulerApp application,
User user) {
+ writeLock.lock();
try {
- writeLock.lock();
// Accept
user.submitApplication();
getPendingAppsOrderingPolicy().addSchedulableEntity(application);
@@ -969,9 +969,9 @@ public class LeafQueue extends AbstractCSQueue {
private void removeApplicationAttempt(
FiCaSchedulerApp application, String userName) {
- try {
- writeLock.lock();
+ writeLock.lock();
+ try {
// TODO, should use getUser, use this method just to avoid UT failure
// which is caused by wrong invoking order, will fix UT separately
User user = usersManager.getUserAndAddIfAbsent(userName);
@@ -1228,8 +1228,8 @@ public class LeafQueue extends AbstractCSQueue {
// Do not check limits when allocation from a reserved container
if (allocation.getAllocateFromReservedContainer() == null) {
+ readLock.lock();
try {
- readLock.lock();
FiCaSchedulerApp app =
schedulerContainer.getSchedulerApplicationAttempt();
String username = app.getUser();
@@ -1329,9 +1329,8 @@ public class LeafQueue extends AbstractCSQueue {
releaseContainers(cluster, request);
+ writeLock.lock();
try {
- writeLock.lock();
-
if (request.anythingAllocatedOrReserved()) {
ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode>
allocation = request.getFirstAllocatedOrReservedContainer();
@@ -1549,8 +1548,9 @@ public class LeafQueue extends AbstractCSQueue {
protected boolean canAssignToUser(Resource clusterResource,
String userName, Resource limit, FiCaSchedulerApp application,
String nodePartition, ResourceLimits currentResourceLimits) {
+
+ readLock.lock();
try {
- readLock.lock();
User user = getUser(userName);
if (user == null) {
if (LOG.isDebugEnabled()) {
@@ -1631,8 +1631,8 @@ public class LeafQueue extends AbstractCSQueue {
*/
public void recalculateQueueUsageRatio(Resource clusterResource,
String nodePartition) {
+ writeLock.lock();
try {
- writeLock.lock();
ResourceUsage queueResourceUsage = getQueueResourceUsage();
if (nodePartition == null) {
@@ -1661,8 +1661,8 @@ public class LeafQueue extends AbstractCSQueue {
boolean removed = false;
// Careful! Locking order is important!
+ writeLock.lock();
try {
- writeLock.lock();
Container container = rmContainer.getContainer();
// Inform the application & the node
@@ -1714,8 +1714,8 @@ public class LeafQueue extends AbstractCSQueue {
void allocateResource(Resource clusterResource,
SchedulerApplicationAttempt application, Resource resource,
String nodePartition, RMContainer rmContainer) {
+ writeLock.lock();
try {
- writeLock.lock();
super.allocateResource(clusterResource, resource, nodePartition);
// handle ignore exclusivity container
@@ -1759,8 +1759,8 @@ public class LeafQueue extends AbstractCSQueue {
void releaseResource(Resource clusterResource,
FiCaSchedulerApp application, Resource resource, String nodePartition,
RMContainer rmContainer) {
+ writeLock.lock();
try {
- writeLock.lock();
super.releaseResource(clusterResource, resource, nodePartition);
// handle ignore exclusivity container
@@ -1815,8 +1815,8 @@ public class LeafQueue extends AbstractCSQueue {
@Override
public void updateClusterResource(Resource clusterResource,
ResourceLimits currentResourceLimits) {
+ writeLock.lock();
try {
- writeLock.lock();
updateCurrentResourceLimits(currentResourceLimits, clusterResource);
lastClusterResource = clusterResource;
@@ -1898,8 +1898,8 @@ public class LeafQueue extends AbstractCSQueue {
return;
}
// Careful! Locking order is important!
+ writeLock.lock();
try {
- writeLock.lock();
FiCaSchedulerNode node = scheduler.getNode(
rmContainer.getContainer().getNodeId());
allocateResource(clusterResource, attempt,
@@ -1962,8 +1962,8 @@ public class LeafQueue extends AbstractCSQueue {
public Resource getTotalPendingResourcesConsideringUserLimit(
Resource clusterResources, String partition,
boolean deductReservedFromPending) {
+ readLock.lock();
try {
- readLock.lock();
Map<String, Resource> userNameToHeadroom =
new HashMap<>();
Resource totalPendingConsideringUserLimit = Resource.newInstance(0, 0);
@@ -2006,8 +2006,8 @@ public class LeafQueue extends AbstractCSQueue {
@Override
public void collectSchedulerApplications(
Collection<ApplicationAttemptId> apps) {
+ readLock.lock();
try {
- readLock.lock();
for (FiCaSchedulerApp pendingApp : pendingOrderingPolicy
.getSchedulableEntities()) {
apps.add(pendingApp.getApplicationAttemptId());
@@ -2066,9 +2066,9 @@ public class LeafQueue extends AbstractCSQueue {
public Map<String, TreeSet<RMContainer>>
getIgnoreExclusivityRMContainers() {
Map<String, TreeSet<RMContainer>> clonedMap = new HashMap<>();
- try {
- readLock.lock();
+ readLock.lock();
+ try {
for (Map.Entry<String, TreeSet<RMContainer>> entry : ignorePartitionExclusivityRMContainers
.entrySet()) {
clonedMap.put(entry.getKey(), new TreeSet<>(entry.getValue()));
@@ -2117,8 +2117,8 @@ public class LeafQueue extends AbstractCSQueue {
void setOrderingPolicy(
OrderingPolicy<FiCaSchedulerApp> orderingPolicy) {
+ writeLock.lock();
try {
- writeLock.lock();
if (null != this.orderingPolicy) {
orderingPolicy.addAllSchedulableEntities(
this.orderingPolicy.getSchedulableEntities());
@@ -2136,8 +2136,8 @@ public class LeafQueue extends AbstractCSQueue {
public void updateApplicationPriority(SchedulerApplication<FiCaSchedulerApp> app,
Priority newAppPriority) {
+ writeLock.lock();
try {
- writeLock.lock();
FiCaSchedulerApp attempt = app.getCurrentAppAttempt();
boolean isActive = orderingPolicy.removeSchedulableEntity(attempt);
if (!isActive) {
@@ -2188,8 +2188,8 @@ public class LeafQueue extends AbstractCSQueue {
@Override
public void stopQueue() {
+ writeLock.lock();
try {
- writeLock.lock();
if (getNumApplications() > 0) {
updateQueueState(QueueState.DRAINING);
} else {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java
index 6788bb4..6c40a23 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java
@@ -74,8 +74,8 @@ public class ManagedParentQueue extends AbstractManagedParentQueue {
public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
throws IOException {
+ writeLock.lock();
try {
- writeLock.lock();
validate(newlyParsedQueue);
shouldFailAutoCreationWhenGuaranteedCapacityExceeded =
@@ -184,9 +184,9 @@ public class ManagedParentQueue extends AbstractManagedParentQueue {
@Override
public void addChildQueue(CSQueue childQueue)
throws SchedulerDynamicEditException, IOException {
- try {
- writeLock.lock();
+ writeLock.lock();
+ try {
if (childQueue == null || !(childQueue instanceof AutoCreatedLeafQueue)) {
throw new SchedulerDynamicEditException(
"Expected child queue to be an instance of AutoCreatedLeafQueue");
@@ -231,8 +231,8 @@ public class ManagedParentQueue extends AbstractManagedParentQueue {
}
public List<FiCaSchedulerApp> getScheduleableApplications() {
+ readLock.lock();
try {
- readLock.lock();
List<FiCaSchedulerApp> apps = new ArrayList<>();
for (CSQueue childQueue : getChildQueues()) {
apps.addAll(((LeafQueue) childQueue).getApplications());
@@ -244,8 +244,8 @@ public class ManagedParentQueue extends AbstractManagedParentQueue {
}
public List<FiCaSchedulerApp> getPendingApplications() {
+ readLock.lock();
try {
- readLock.lock();
List<FiCaSchedulerApp> apps = new ArrayList<>();
for (CSQueue childQueue : getChildQueues()) {
apps.addAll(((LeafQueue) childQueue).getPendingApplications());
@@ -257,8 +257,8 @@ public class ManagedParentQueue extends AbstractManagedParentQueue {
}
public List<FiCaSchedulerApp> getAllApplications() {
+ readLock.lock();
try {
- readLock.lock();
List<FiCaSchedulerApp> apps = new ArrayList<>();
for (CSQueue childQueue : getChildQueues()) {
apps.addAll(((LeafQueue) childQueue).getAllApplications());
@@ -286,9 +286,9 @@ public class ManagedParentQueue extends AbstractManagedParentQueue {
public void validateAndApplyQueueManagementChanges(
List<QueueManagementChange> queueManagementChanges)
throws IOException, SchedulerDynamicEditException {
- try {
- writeLock.lock();
+ writeLock.lock();
+ try {
validateQueueManagementChanges(queueManagementChanges);
applyQueueManagementChanges(queueManagementChanges);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index ddf4bf4..c9d0324 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -126,8 +126,8 @@ public class ParentQueue extends AbstractCSQueue {
protected void setupQueueConfigs(Resource clusterResource)
throws IOException {
+ writeLock.lock();
try {
- writeLock.lock();
super.setupQueueConfigs(clusterResource);
StringBuilder aclsString = new StringBuilder();
for (Map.Entry<AccessType, AccessControlList> e : acls.entrySet()) {
@@ -166,8 +166,8 @@ public class ParentQueue extends AbstractCSQueue {
private static float PRECISION = 0.0005f; // 0.05% precision
void setChildQueues(Collection<CSQueue> childQueues) {
+ writeLock.lock();
try {
- writeLock.lock();
// Validate
float childCapacities = 0;
Resource minResDefaultLabel = Resources.createResource(0, 0);
@@ -257,8 +257,8 @@ public class ParentQueue extends AbstractCSQueue {
@Override
public QueueInfo getQueueInfo(
boolean includeChildQueues, boolean recursive) {
+ readLock.lock();
try {
- readLock.lock();
QueueInfo queueInfo = getQueueInfo();
List<QueueInfo> childQueuesInfo = new ArrayList<>();
@@ -279,8 +279,8 @@ public class ParentQueue extends AbstractCSQueue {
private QueueUserACLInfo getUserAclInfo(
UserGroupInformation user) {
+ readLock.lock();
try {
- readLock.lock();
QueueUserACLInfo userAclInfo = recordFactory.newRecordInstance(
QueueUserACLInfo.class);
List<QueueACL> operations = new ArrayList<QueueACL>();
@@ -302,8 +302,8 @@ public class ParentQueue extends AbstractCSQueue {
@Override
public List<QueueUserACLInfo> getQueueUserAclInfo(
UserGroupInformation user) {
+ readLock.lock();
try {
- readLock.lock();
List<QueueUserACLInfo> userAcls = new ArrayList<>();
// Add parent queue acls
@@ -335,8 +335,8 @@ public class ParentQueue extends AbstractCSQueue {
@Override
public void reinitialize(CSQueue newlyParsedQueue,
Resource clusterResource) throws IOException {
+ writeLock.lock();
try {
- writeLock.lock();
// Sanity check
if (!(newlyParsedQueue instanceof ParentQueue) || !newlyParsedQueue
.getQueuePath().equals(getQueuePath())) {
@@ -430,9 +430,8 @@ public class ParentQueue extends AbstractCSQueue {
@Override
public void submitApplication(ApplicationId applicationId, String user,
String queue) throws AccessControlException {
-
+ writeLock.lock();
try {
- writeLock.lock();
// Sanity check
validateSubmitApplication(applicationId, user, queue);
@@ -456,8 +455,8 @@ public class ParentQueue extends AbstractCSQueue {
public void validateSubmitApplication(ApplicationId applicationId,
String userName, String queue) throws AccessControlException {
+ writeLock.lock();
try {
- writeLock.lock();
if (queue.equals(queueName)) {
throw new AccessControlException(
"Cannot submit application " + "to non-leaf queue: " + queueName);
@@ -487,9 +486,8 @@ public class ParentQueue extends AbstractCSQueue {
private void addApplication(ApplicationId applicationId,
String user) {
-
+ writeLock.lock();
try {
- writeLock.lock();
++numApplications;
LOG.info(
@@ -516,8 +514,8 @@ public class ParentQueue extends AbstractCSQueue {
private void removeApplication(ApplicationId applicationId,
String user) {
+ writeLock.lock();
try {
- writeLock.lock();
--numApplications;
LOG.info("Application removed -" + " appId: " + applicationId + " user: "
@@ -854,8 +852,8 @@ public class ParentQueue extends AbstractCSQueue {
private void internalReleaseResource(Resource clusterResource,
FiCaSchedulerNode node, Resource releasedResource) {
+ writeLock.lock();
try {
- writeLock.lock();
super.releaseResource(clusterResource, releasedResource,
node.getPartition());
@@ -891,9 +889,8 @@ public class ParentQueue extends AbstractCSQueue {
@Override
public void updateClusterResource(Resource clusterResource,
ResourceLimits resourceLimits) {
+ writeLock.lock();
try {
- writeLock.lock();
-
// Update effective capacity in all parent queue.
Set<String> configuredNodelabels = csContext.getConfiguration()
.getConfiguredNodeLabels(getQueuePath());
@@ -1133,8 +1130,8 @@ public class ParentQueue extends AbstractCSQueue {
@Override
public List<CSQueue> getChildQueues() {
+ readLock.lock();
try {
- readLock.lock();
return new ArrayList<CSQueue>(childQueues);
} finally {
readLock.unlock();
@@ -1153,8 +1150,8 @@ public class ParentQueue extends AbstractCSQueue {
}
// Careful! Locking order is important!
+ writeLock.lock();
try {
- writeLock.lock();
FiCaSchedulerNode node = scheduler.getNode(
rmContainer.getContainer().getNodeId());
allocateResource(clusterResource,
@@ -1177,8 +1174,8 @@ public class ParentQueue extends AbstractCSQueue {
@Override
public void collectSchedulerApplications(
Collection<ApplicationAttemptId> apps) {
+ readLock.lock();
try {
- readLock.lock();
for (CSQueue queue : childQueues) {
queue.collectSchedulerApplications(apps);
}
@@ -1233,8 +1230,8 @@ public class ParentQueue extends AbstractCSQueue {
void allocateResource(Resource clusterResource,
Resource resource, String nodePartition) {
+ writeLock.lock();
try {
- writeLock.lock();
super.allocateResource(clusterResource, resource, nodePartition);
/**
@@ -1331,8 +1328,8 @@ public class ParentQueue extends AbstractCSQueue {
// Do not modify queue when allocation from reserved container
if (allocation.getAllocateFromReservedContainer() == null) {
+ writeLock.lock();
try {
- writeLock.lock();
// Book-keeping
// Note: Update headroom to account for current allocation too...
allocateResource(cluster, allocation.getAllocatedOrReservedResource(),
@@ -1355,8 +1352,8 @@ public class ParentQueue extends AbstractCSQueue {
@Override
public void stopQueue() {
+ this.writeLock.lock();
try {
- this.writeLock.lock();
if (getNumApplications() > 0) {
updateQueueState(QueueState.DRAINING);
} else {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
index 757002f..79afcdc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
@@ -81,8 +81,8 @@ public class PlanQueue extends AbstractManagedParentQueue {
@Override
public void reinitialize(CSQueue newlyParsedQueue,
Resource clusterResource) throws IOException {
+ writeLock.lock();
try {
- writeLock.lock();
// Sanity check
if (!(newlyParsedQueue instanceof PlanQueue) || !newlyParsedQueue
.getQueuePath().equals(getQueuePath())) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java
index 51b9fa8..c1b7157 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java
@@ -83,8 +83,8 @@ public class QueueCapacities {
}
private float _get(String label, CapacityType type) {
+ readLock.lock();
try {
- readLock.lock();
Capacities cap = capacitiesMap.get(label);
if (null == cap) {
return LABEL_DOESNT_EXIST_CAP;
@@ -96,8 +96,8 @@ public class QueueCapacities {
}
private void _set(String label, CapacityType type, float value) {
+ writeLock.lock();
try {
- writeLock.lock();
Capacities cap = capacitiesMap.get(label);
if (null == cap) {
cap = new Capacities();
@@ -277,8 +277,8 @@ public class QueueCapacities {
* configurable fields, and load new values
*/
public void clearConfigurableFields() {
+ writeLock.lock();
try {
- writeLock.lock();
for (String label : capacitiesMap.keySet()) {
_set(label, CapacityType.CAP, 0);
_set(label, CapacityType.MAX_CAP, 0);
@@ -291,8 +291,8 @@ public class QueueCapacities {
}
public Set<String> getExistingNodeLabels() {
+ readLock.lock();
try {
- readLock.lock();
return new HashSet<String>(capacitiesMap.keySet());
} finally {
readLock.unlock();
@@ -301,8 +301,8 @@ public class QueueCapacities {
@Override
public String toString() {
+ readLock.lock();
try {
- readLock.lock();
return this.capacitiesMap.toString();
} finally {
readLock.unlock();
@@ -310,8 +310,8 @@ public class QueueCapacities {
}
public Set<String> getNodePartitionsSet() {
+ readLock.lock();
try {
- readLock.lock();
return capacitiesMap.keySet();
} finally {
readLock.unlock();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
index 34f4aa1..d59c02b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
@@ -53,8 +53,8 @@ public class ReservationQueue extends AbstractAutoCreatedLeafQueue {
@Override
public void reinitialize(CSQueue newlyParsedQueue,
Resource clusterResource) throws IOException {
+ writeLock.lock();
try {
- writeLock.lock();
// Sanity check
if (!(newlyParsedQueue instanceof ReservationQueue) || !newlyParsedQueue
.getQueuePath().equals(getQueuePath())) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
index 960067e..470bb11 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
@@ -114,8 +114,8 @@ public class UsersManager implements AbstractUsersManager {
}
private void incUsageRatio(String label, float delta) {
+ writeLock.lock();
try {
- writeLock.lock();
float usage = 0f;
if (usageRatios.containsKey(label)) {
usage = usageRatios.get(label);
@@ -128,8 +128,8 @@ public class UsersManager implements AbstractUsersManager {
}
private float getUsageRatio(String label) {
+ readLock.lock();
try {
- readLock.lock();
Float f = usageRatios.get(label);
if (null == f) {
return 0.0f;
@@ -141,8 +141,8 @@ public class UsersManager implements AbstractUsersManager {
}
private void setUsageRatio(String label, float ratio) {
+ writeLock.lock();
try {
- writeLock.lock();
usageRatios.put(label, ratio);
} finally {
writeLock.unlock();
@@ -179,8 +179,8 @@ public class UsersManager implements AbstractUsersManager {
public float setAndUpdateUsageRatio(ResourceCalculator resourceCalculator,
Resource resource, String nodePartition) {
+ writeLock.lock();
try {
- writeLock.lock();
userUsageRatios.setUsageRatio(nodePartition, 0);
return updateUsageRatio(resourceCalculator, resource, nodePartition);
} finally {
@@ -190,8 +190,8 @@ public class UsersManager implements AbstractUsersManager {
public float updateUsageRatio(ResourceCalculator resourceCalculator,
Resource resource, String nodePartition) {
+ writeLock.lock();
try {
- writeLock.lock();
float delta;
float newRatio = Resources.ratio(resourceCalculator,
getUsed(nodePartition), resource);
@@ -358,8 +358,8 @@ public class UsersManager implements AbstractUsersManager {
// If latestVersionOfUsersState is negative due to overflow, ideally we need
// to reset it. This method is invoked from UsersManager and LeafQueue and
// all is happening within write/readLock. Below logic can help to set 0.
+ writeLock.lock();
try {
- writeLock.lock();
long value = latestVersionOfUsersState.incrementAndGet();
if (value < 0) {
@@ -395,8 +395,8 @@ public class UsersManager implements AbstractUsersManager {
* User Name
*/
public void removeUser(String userName) {
+ writeLock.lock();
try {
- writeLock.lock();
this.users.remove(userName);
// Remove user from active/non-active list as well.
@@ -417,8 +417,8 @@ public class UsersManager implements AbstractUsersManager {
* @return User object
*/
public User getUserAndAddIfAbsent(String userName) {
+ writeLock.lock();
try {
- writeLock.lock();
User u = getUser(userName);
if (null == u) {
u = new User(userName);
@@ -448,8 +448,8 @@ public class UsersManager implements AbstractUsersManager {
* @return an ArrayList of UserInfo objects who are active in this queue
*/
public ArrayList<UserInfo> getUsersInfo() {
+ readLock.lock();
try {
- readLock.lock();
ArrayList<UserInfo> usersToReturn = new ArrayList<UserInfo>();
for (Map.Entry<String, User> entry : getUsers().entrySet()) {
User user = entry.getValue();
@@ -494,8 +494,8 @@ public class UsersManager implements AbstractUsersManager {
Map<SchedulingMode, Resource> userLimitPerSchedulingMode;
+ writeLock.lock();
try {
- writeLock.lock();
userLimitPerSchedulingMode =
preComputedActiveUserLimit.get(nodePartition);
if (isRecomputeNeeded(schedulingMode, nodePartition, true)) {
@@ -553,8 +553,8 @@ public class UsersManager implements AbstractUsersManager {
Map<SchedulingMode, Resource> userLimitPerSchedulingMode;
+ writeLock.lock();
try {
- writeLock.lock();
userLimitPerSchedulingMode = preComputedAllUserLimit.get(nodePartition);
if (isRecomputeNeeded(schedulingMode, nodePartition, false)) {
// recompute
@@ -602,8 +602,8 @@ public class UsersManager implements AbstractUsersManager {
*/
private void setLocalVersionOfUsersState(String nodePartition,
SchedulingMode schedulingMode, boolean isActive) {
+ writeLock.lock();
try {
- writeLock.lock();
Map<String, Map<SchedulingMode, Long>> localVersionOfUsersState = (isActive)
? localVersionOfActiveUsersState
: localVersionOfAllUsersState;
@@ -626,8 +626,8 @@ public class UsersManager implements AbstractUsersManager {
*/
private long getLocalVersionOfUsersState(String nodePartition,
SchedulingMode schedulingMode, boolean isActive) {
+ this.readLock.lock();
try {
- this.readLock.lock();
Map<String, Map<SchedulingMode, Long>> localVersionOfUsersState = (isActive)
? localVersionOfActiveUsersState
: localVersionOfAllUsersState;
@@ -825,8 +825,8 @@ public class UsersManager implements AbstractUsersManager {
* Cluster Resource
*/
public void updateUsageRatio(String partition, Resource clusterResource) {
+ writeLock.lock();
try {
- writeLock.lock();
Resource resourceByLabel = labelManager.getResourceByLabel(partition,
clusterResource);
float consumed = 0;
@@ -852,9 +852,9 @@ public class UsersManager implements AbstractUsersManager {
@Override
public void activateApplication(String user, ApplicationId applicationId) {
- try {
- this.writeLock.lock();
+ this.writeLock.lock();
+ try {
User userDesc = getUser(user);
if (userDesc != null && userDesc.getActiveApplications() <= 0) {
return;
@@ -885,9 +885,9 @@ public class UsersManager implements AbstractUsersManager {
@Override
public void deactivateApplication(String user, ApplicationId applicationId) {
- try {
- this.writeLock.lock();
+ this.writeLock.lock();
+ try {
Set<ApplicationId> userApps = usersApplications.get(user);
if (userApps != null) {
if (userApps.remove(applicationId)) {
@@ -919,8 +919,8 @@ public class UsersManager implements AbstractUsersManager {
float sumActiveUsersTimesWeights() {
float count = 0.0f;
+ this.readLock.lock();
try {
- this.readLock.lock();
for (String u : activeUsersSet) {
count += getUser(u).getWeight();
}
@@ -932,8 +932,8 @@ public class UsersManager implements AbstractUsersManager {
float sumAllUsersTimesWeights() {
float count = 0.0f;
+ this.readLock.lock();
try {
- this.readLock.lock();
for (String u : users.keySet()) {
count += getUser(u).getWeight();
}
@@ -944,9 +944,8 @@ public class UsersManager implements AbstractUsersManager {
}
private void updateActiveUsersResourceUsage(String userName) {
+ this.writeLock.lock();
try {
- this.writeLock.lock();
-
// For UT case: We might need to add the user to users list.
User user = getUserAndAddIfAbsent(userName);
ResourceUsage resourceUsage = user.getResourceUsage();
@@ -983,8 +982,8 @@ public class UsersManager implements AbstractUsersManager {
}
private void updateNonActiveUsersResourceUsage(String userName) {
+ this.writeLock.lock();
try {
- this.writeLock.lock();
// For UT case: We might need to add the user to users list.
User user = getUser(userName);
@@ -1052,8 +1051,8 @@ public class UsersManager implements AbstractUsersManager {
*/
public User updateUserResourceUsage(String userName, Resource resource,
String nodePartition, boolean isAllocate) {
+ this.writeLock.lock();
try {
- this.writeLock.lock();
// TODO, should use getUser, use this method just to avoid UT failure
// which is caused by wrong invoking order, will fix UT separately
@@ -1099,8 +1098,8 @@ public class UsersManager implements AbstractUsersManager {
}
public void updateUserWeights() {
+ this.writeLock.lock();
try {
- this.writeLock.lock();
for (Map.Entry<String, User> ue : users.entrySet()) {
ue.getValue().setWeight(getUserWeightFromQueue(ue.getKey()));
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java
index 76fcd4a..ae07087 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java
@@ -43,8 +43,8 @@ public class PreemptionManager {
}
public void refreshQueues(CSQueue parent, CSQueue current) {
+ writeLock.lock();
try {
- writeLock.lock();
PreemptableQueue parentEntity = null;
if (parent != null) {
parentEntity = entities.get(parent.getQueueName());
@@ -67,8 +67,8 @@ public class PreemptionManager {
}
public void addKillableContainer(KillableContainer container) {
+ writeLock.lock();
try {
- writeLock.lock();
PreemptableQueue entity = entities.get(container.getLeafQueueName());
if (null != entity) {
entity.addKillableContainer(container);
@@ -80,8 +80,8 @@ public class PreemptionManager {
}
public void removeKillableContainer(KillableContainer container) {
+ writeLock.lock();
try {
- writeLock.lock();
PreemptableQueue entity = entities.get(container.getLeafQueueName());
if (null != entity) {
entity.removeKillableContainer(container);
@@ -106,8 +106,8 @@ public class PreemptionManager {
@VisibleForTesting
public Map<ContainerId, RMContainer> getKillableContainersMap(
String queueName, String partition) {
+ readLock.lock();
try {
- readLock.lock();
PreemptableQueue entity = entities.get(queueName);
if (entity != null) {
Map<ContainerId, RMContainer> containers =
@@ -129,8 +129,8 @@ public class PreemptionManager {
}
public Resource getKillableResource(String queueName, String partition) {
+ readLock.lock();
try {
- readLock.lock();
PreemptableQueue entity = entities.get(queueName);
if (entity != null) {
Resource res = entity.getTotalKillableResources().get(partition);
@@ -147,8 +147,8 @@ public class PreemptionManager {
}
public Map<String, PreemptableQueue> getShallowCopyOfPreemptableQueues() {
+ readLock.lock();
try {
- readLock.lock();
Map<String, PreemptableQueue> map = new HashMap<>();
for (Map.Entry<String, PreemptableQueue> entry : entities.entrySet()) {
String key = entry.getKey();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java
index faa6e6f..b1d3f74 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java
@@ -202,8 +202,8 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
new HashMap<String, Float>();
private float getAbsoluteActivatedChildQueueCapacity(String nodeLabel) {
+ readLock.lock();
try {
- readLock.lock();
Float totalActivatedCapacity = getAbsActivatedChildQueueCapacityByLabel(
nodeLabel);
if (totalActivatedCapacity != null) {
@@ -218,8 +218,8 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
private void incAbsoluteActivatedChildCapacity(String nodeLabel,
float childQueueCapacity) {
+ writeLock.lock();
try {
- writeLock.lock();
Float activatedChildCapacity = getAbsActivatedChildQueueCapacityByLabel(
nodeLabel);
if (activatedChildCapacity != null) {
@@ -236,8 +236,8 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
private void decAbsoluteActivatedChildCapacity(String nodeLabel,
float childQueueCapacity) {
+ writeLock.lock();
try {
- writeLock.lock();
Float activatedChildCapacity = getAbsActivatedChildQueueCapacityByLabel(
nodeLabel);
if (activatedChildCapacity != null) {
@@ -360,8 +360,8 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
//synch/add missing leaf queue(s) if any to state
updateLeafQueueState();
+ readLock.lock();
try {
- readLock.lock();
List<QueueManagementChange> queueManagementChanges = new ArrayList<>();
List<FiCaSchedulerApp> pendingApps = getSortedPendingApplications();
@@ -483,8 +483,8 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
@VisibleForTesting
void updateLeafQueueState() {
+ writeLock.lock();
try {
- writeLock.lock();
Set<String> newPartitions = new HashSet<>();
Set<String> newQueues = new HashSet<>();
@@ -570,8 +570,8 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
@VisibleForTesting
public boolean isActive(final AutoCreatedLeafQueue leafQueue,
String nodeLabel) throws SchedulerDynamicEditException {
+ readLock.lock();
try {
- readLock.lock();
LeafQueueStatePerPartition leafQueueStatus = getLeafQueueState(leafQueue,
nodeLabel);
return leafQueueStatus.isActive();
@@ -649,8 +649,8 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
public void commitQueueManagementChanges(
List<QueueManagementChange> queueManagementChanges)
throws SchedulerDynamicEditException {
+ writeLock.lock();
try {
- writeLock.lock();
for (QueueManagementChange queueManagementChange :
queueManagementChanges) {
AutoCreatedLeafQueueConfig updatedQueueTemplate =
@@ -695,8 +695,8 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
private void activate(final AbstractAutoCreatedLeafQueue leafQueue,
String nodeLabel) throws SchedulerDynamicEditException {
+ writeLock.lock();
try {
- writeLock.lock();
getLeafQueueState(leafQueue, nodeLabel).activate();
parentQueueState.incAbsoluteActivatedChildCapacity(nodeLabel,
leafQueueTemplateCapacities.getAbsoluteCapacity(nodeLabel));
@@ -707,8 +707,8 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
private void deactivate(final AbstractAutoCreatedLeafQueue leafQueue,
String nodeLabel) throws SchedulerDynamicEditException {
+ writeLock.lock();
try {
- writeLock.lock();
getLeafQueueState(leafQueue, nodeLabel).deactivate();
parentQueueState.decAbsoluteActivatedChildCapacity(nodeLabel,
@@ -765,9 +765,8 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
.getClass());
}
+ writeLock.lock();
try {
- writeLock.lock();
-
QueueCapacities capacities = new QueueCapacities(false);
for (String nodeLabel : leafQueueTemplateNodeLabels) {
if (!leafQueueState.createLeafQueueStateIfNotExists(leafQueue,
@@ -816,8 +815,8 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
@VisibleForTesting
LeafQueueStatePerPartition getLeafQueueState(LeafQueue queue,
String partition) throws SchedulerDynamicEditException {
+ readLock.lock();
try {
- readLock.lock();
String queueName = queue.getQueueName();
if (!leafQueueState.containsLeafQueue(queueName, partition)) {
throw new SchedulerDynamicEditException(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 26a8aff..2010711 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -201,8 +201,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
public boolean containerCompleted(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event,
String partition) {
+ writeLock.lock();
try {
- writeLock.lock();
ContainerId containerId = rmContainer.getContainerId();
// Remove from the list of containers
@@ -246,8 +246,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
public RMContainer allocate(FiCaSchedulerNode node,
SchedulerRequestKey schedulerKey, Container container) {
+ readLock.lock();
try {
- readLock.lock();
if (isStopped) {
return null;
@@ -438,8 +438,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
ContainerRequest containerRequest = null;
boolean reReservation = false;
+ readLock.lock();
try {
- readLock.lock();
// First make sure no container in release list in final state
if (anyContainerInFinalState(request)) {
@@ -561,8 +561,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
FiCaSchedulerNode> request, boolean updatePending) {
boolean reReservation = false;
+ writeLock.lock();
try {
- writeLock.lock();
// If we allocated something
if (request.anythingAllocatedOrReserved()) {
@@ -693,8 +693,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
public boolean unreserve(SchedulerRequestKey schedulerKey,
FiCaSchedulerNode node, RMContainer rmContainer) {
+ writeLock.lock();
try {
- writeLock.lock();
// Done with the reservation?
if (internalUnreserve(node, schedulerKey)) {
node.unreserveResource(this);
@@ -749,8 +749,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
}
public Map<String, Resource> getTotalPendingRequestsPerPartition() {
+ readLock.lock();
try {
- readLock.lock();
Map<String, Resource> ret = new HashMap<>();
for (SchedulerRequestKey schedulerKey : appSchedulingInfo
@@ -781,8 +781,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
}
public void markContainerForPreemption(ContainerId cont) {
+ writeLock.lock();
try {
- writeLock.lock();
// ignore already completed containers
if (liveContainers.containsKey(cont)) {
containersToPreempt.add(cont);
@@ -804,8 +804,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
*/
public Allocation getAllocation(ResourceCalculator resourceCalculator,
Resource clusterResource, Resource minimumAllocation) {
+ writeLock.lock();
try {
- writeLock.lock();
Set<ContainerId> currentContPreemption = Collections.unmodifiableSet(
new HashSet<ContainerId>(containersToPreempt));
containersToPreempt.clear();
@@ -873,8 +873,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
public void setHeadroomProvider(
CapacityHeadroomProvider headroomProvider) {
+ writeLock.lock();
try {
- writeLock.lock();
this.headroomProvider = headroomProvider;
} finally {
writeLock.unlock();
@@ -883,8 +883,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
@Override
public Resource getHeadroom() {
+ readLock.lock();
try {
- readLock.lock();
if (headroomProvider != null) {
return headroomProvider.getHeadroom();
}
@@ -898,8 +898,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
@Override
public void transferStateFromPreviousAttempt(
SchedulerApplicationAttempt appAttempt) {
+ writeLock.lock();
try {
- writeLock.lock();
super.transferStateFromPreviousAttempt(appAttempt);
this.headroomProvider = ((FiCaSchedulerApp) appAttempt).headroomProvider;
} finally {
@@ -926,8 +926,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
@VisibleForTesting
public RMContainer findNodeToUnreserve(FiCaSchedulerNode node,
SchedulerRequestKey schedulerKey, Resource minimumUnreservedResource) {
+ readLock.lock();
try {
- readLock.lock();
// need to unreserve some other container first
NodeId idToUnreserve = getNodeIdToUnreserve(schedulerKey,
minimumUnreservedResource, rc);
@@ -1108,11 +1108,11 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
*/
@Override
public ApplicationResourceUsageReport getResourceUsageReport() {
+ writeLock.lock();
try {
// Use write lock here because
// SchedulerApplicationAttempt#getResourceUsageReport updated fields
// TODO: improve this
- writeLock.lock();
ApplicationResourceUsageReport report = super.getResourceUsageReport();
Resource cluster = rmContext.getScheduler().getClusterResource();
Resource totalPartitionRes =
@@ -1175,8 +1175,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
*/
public boolean moveReservation(RMContainer reservedContainer,
FiCaSchedulerNode sourceNode, FiCaSchedulerNode targetNode) {
+ writeLock.lock();
try {
- writeLock.lock();
if (!sourceNode.getPartition().equals(targetNode.getPartition())) {
if (LOG.isDebugEnabled()) {
LOG.debug(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java
index bf04672..9436138 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java
@@ -83,8 +83,8 @@ public class MemoryPlacementConstraintManager
Map<Set<String>, PlacementConstraint> constraintMap) {
// Check if app already exists. If not, prepare its constraint map.
Map<String, PlacementConstraint> constraintsForApp = new HashMap<>();
+ readLock.lock();
try {
- readLock.lock();
if (appConstraints.get(appId) != null) {
LOG.warn("Application {} has already been registered.", appId);
return;
@@ -109,8 +109,8 @@ public class MemoryPlacementConstraintManager
appId);
}
// Update appConstraints.
+ writeLock.lock();
try {
- writeLock.lock();
appConstraints.put(appId, constraintsForApp);
} finally {
writeLock.unlock();
@@ -120,8 +120,8 @@ public class MemoryPlacementConstraintManager
@Override
public void addConstraint(ApplicationId appId, Set<String> sourceTags,
PlacementConstraint placementConstraint, boolean replace) {
+ writeLock.lock();
try {
- writeLock.lock();
Map<String, PlacementConstraint> constraintsForApp =
appConstraints.get(appId);
if (constraintsForApp == null) {
@@ -140,8 +140,8 @@ public class MemoryPlacementConstraintManager
@Override
public void addGlobalConstraint(Set<String> sourceTags,
PlacementConstraint placementConstraint, boolean replace) {
+ writeLock.lock();
try {
- writeLock.lock();
addConstraintToMap(globalConstraints, sourceTags, placementConstraint,
replace);
} finally {
@@ -181,8 +181,8 @@ public class MemoryPlacementConstraintManager
@Override
public Map<Set<String>, PlacementConstraint> getConstraints(
ApplicationId appId) {
+ readLock.lock();
try {
- readLock.lock();
if (appConstraints.get(appId) == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Application {} is not registered in the Placement "
@@ -212,8 +212,8 @@ public class MemoryPlacementConstraintManager
return null;
}
String sourceTag = getValidSourceTag(sourceTags);
+ readLock.lock();
try {
- readLock.lock();
if (appConstraints.get(appId) == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Application {} is not registered in the Placement "
@@ -235,8 +235,8 @@ public class MemoryPlacementConstraintManager
return null;
}
String sourceTag = getValidSourceTag(sourceTags);
+ readLock.lock();
try {
- readLock.lock();
return globalConstraints.get(sourceTag);
} finally {
readLock.unlock();
@@ -284,8 +284,8 @@ public class MemoryPlacementConstraintManager
@Override
public void unregisterApplication(ApplicationId appId) {
+ writeLock.lock();
try {
- writeLock.lock();
appConstraints.remove(appId);
} finally {
writeLock.unlock();
@@ -298,8 +298,8 @@ public class MemoryPlacementConstraintManager
return;
}
String sourceTag = getValidSourceTag(sourceTags);
+ writeLock.lock();
try {
- writeLock.lock();
globalConstraints.remove(sourceTag);
} finally {
writeLock.unlock();
@@ -308,8 +308,8 @@ public class MemoryPlacementConstraintManager
@Override
public int getNumRegisteredApplications() {
+ readLock.lock();
try {
- readLock.lock();
return appConstraints.size();
} finally {
readLock.unlock();
@@ -318,8 +318,8 @@ public class MemoryPlacementConstraintManager
@Override
public int getNumGlobalConstraints() {
+ readLock.lock();
try {
- readLock.lock();
return globalConstraints.size();
} finally {
readLock.unlock();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index dcc880d..d5a1f4c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -132,8 +132,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
void containerCompleted(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event) {
+ writeLock.lock();
try {
- writeLock.lock();
Container container = rmContainer.getContainer();
ContainerId containerId = container.getId();
@@ -182,8 +182,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
private void unreserveInternal(
SchedulerRequestKey schedulerKey, FSSchedulerNode node) {
+ writeLock.lock();
try {
- writeLock.lock();
Map<NodeId, RMContainer> reservedContainers = this.reservedContainers.get(
schedulerKey);
RMContainer reservedContainer = reservedContainers.remove(
@@ -285,8 +285,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
return NodeType.OFF_SWITCH;
}
+ writeLock.lock();
try {
- writeLock.lock();
// Default level is NODE_LOCAL
if (!allowedLocalityLevel.containsKey(schedulerKey)) {
@@ -355,8 +355,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
return NodeType.OFF_SWITCH;
}
+ writeLock.lock();
try {
- writeLock.lock();
// default level is NODE_LOCAL
if (!allowedLocalityLevel.containsKey(schedulerKey)) {
@@ -426,8 +426,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
RMContainer rmContainer;
Container container;
+ writeLock.lock();
try {
- writeLock.lock();
// Update allowed locality level
NodeType allowed = allowedLocalityLevel.get(schedulerKey);
if (allowed != null) {
@@ -499,8 +499,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
void resetAllowedLocalityLevel(
SchedulerRequestKey schedulerKey, NodeType level) {
NodeType old;
+ writeLock.lock();
try {
- writeLock.lock();
old = allowedLocalityLevel.put(schedulerKey, level);
} finally {
writeLock.unlock();
@@ -665,9 +665,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
@Override
public synchronized void recoverContainer(SchedulerNode node,
RMContainer rmContainer) {
+ writeLock.lock();
try {
- writeLock.lock();
-
super.recoverContainer(node, rmContainer);
if (!rmContainer.getState().equals(RMContainerState.COMPLETED)) {
@@ -777,8 +776,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
String rackName =
node.getRackName() == null ? "NULL" : node.getRackName();
+ writeLock.lock();
try {
- writeLock.lock();
Set<String> rackReservations = reservations.get(rackName);
if (rackReservations == null) {
rackReservations = new HashSet<>();
@@ -794,8 +793,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
String rackName =
node.getRackName() == null ? "NULL" : node.getRackName();
+ writeLock.lock();
try {
- writeLock.lock();
Set<String> rackReservations = reservations.get(rackName);
if (rackReservations != null) {
rackReservations.remove(node.getNodeName());
@@ -964,8 +963,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
// For each priority, see if we can schedule a node local, rack local
// or off-switch request. Rack of off-switch requests may be delayed
// (not scheduled) in order to promote better locality.
+ writeLock.lock();
try {
- writeLock.lock();
// TODO (wandga): All logics in this method should be added to
// SchedulerPlacement#canDelayTo which is independent from scheduler.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java
index 422f6db..05a5d7b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java
@@ -54,9 +54,9 @@ public class FifoAppAttempt extends FiCaSchedulerApp {
public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
SchedulerRequestKey schedulerKey, Container container) {
- try {
- writeLock.lock();
+ writeLock.lock();
+ try {
if (isStopped) {
return null;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
index f8ccf1d..d0677c3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
@@ -155,9 +155,9 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
public PendingAskUpdateResult updatePendingAsk(
Collection<ResourceRequest> requests,
boolean recoverPreemptedRequestForAContainer) {
- try {
- this.writeLock.lock();
+ this.writeLock.lock();
+ try {
PendingAskUpdateResult updateResult = null;
// Update resource requests
@@ -228,8 +228,8 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
@Override
public PendingAsk getPendingAsk(String resourceName) {
+ readLock.lock();
try {
- readLock.lock();
ResourceRequest request = getResourceRequest(resourceName);
if (null == request) {
return PendingAsk.ZERO;
@@ -245,8 +245,8 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
@Override
public int getOutstandingAsksCount(String resourceName) {
+ readLock.lock();
try {
- readLock.lock();
ResourceRequest request = getResourceRequest(resourceName);
if (null == request) {
return 0;
@@ -353,8 +353,8 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
@Override
public boolean canAllocate(NodeType type, SchedulerNode node) {
+ readLock.lock();
try {
- readLock.lock();
ResourceRequest r = resourceRequestMap.get(
ResourceRequest.ANY);
if (r == null || r.getNumContainers() <= 0) {
@@ -381,8 +381,8 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
@Override
public boolean canDelayTo(String resourceName) {
+ readLock.lock();
try {
- readLock.lock();
ResourceRequest request = getResourceRequest(resourceName);
return request == null || request.getRelaxLocality();
} finally {
@@ -432,8 +432,8 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
@Override
public ContainerRequest allocate(SchedulerRequestKey schedulerKey,
NodeType type, SchedulerNode node) {
+ writeLock.lock();
try {
- writeLock.lock();
List<ResourceRequest> resourceRequests = new ArrayList<>();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java
index a215a89..f4da4d0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java
@@ -363,8 +363,8 @@ public class SingleConstraintAppPlacementAllocator<N extends SchedulerNode>
@Override
public boolean canAllocate(NodeType type, SchedulerNode node) {
+ readLock.lock();
try {
- readLock.lock();
return checkCardinalityAndPending(node);
} finally {
readLock.unlock();
@@ -411,8 +411,8 @@ public class SingleConstraintAppPlacementAllocator<N extends SchedulerNode>
@Override
public void showRequests() {
+ readLock.lock();
try {
- readLock.lock();
if (schedulingRequest != null) {
LOG.info(schedulingRequest.toString());
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java
index 6df88ce..f702c71 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java
@@ -191,8 +191,8 @@ public class NMTokenSecretManagerInRM extends BaseNMTokenSecretManager {
public NMToken createAndGetNMToken(String applicationSubmitter,
ApplicationAttemptId appAttemptId, Container container) {
+ this.writeLock.lock();
try {
- this.writeLock.lock();
HashSet<NodeId> nodeSet = this.appAttemptToNodeKeyMap.get(appAttemptId);
NMToken nmToken = null;
if (nodeSet != null) {
@@ -213,8 +213,8 @@ public class NMTokenSecretManagerInRM extends BaseNMTokenSecretManager {
}
public void registerApplicationAttempt(ApplicationAttemptId appAttemptId) {
+ this.writeLock.lock();
try {
- this.writeLock.lock();
this.appAttemptToNodeKeyMap.put(appAttemptId, new HashSet<NodeId>());
} finally {
this.writeLock.unlock();
@@ -225,8 +225,8 @@ public class NMTokenSecretManagerInRM extends BaseNMTokenSecretManager {
@VisibleForTesting
public boolean isApplicationAttemptRegistered(
ApplicationAttemptId appAttemptId) {
+ this.readLock.lock();
try {
- this.readLock.lock();
return this.appAttemptToNodeKeyMap.containsKey(appAttemptId);
} finally {
this.readLock.unlock();
@@ -237,8 +237,8 @@ public class NMTokenSecretManagerInRM extends BaseNMTokenSecretManager {
@VisibleForTesting
public boolean isApplicationAttemptNMTokenPresent(
ApplicationAttemptId appAttemptId, NodeId nodeId) {
+ this.readLock.lock();
try {
- this.readLock.lock();
HashSet<NodeId> nodes = this.appAttemptToNodeKeyMap.get(appAttemptId);
if (nodes != null && nodes.contains(nodeId)) {
return true;
@@ -251,8 +251,8 @@ public class NMTokenSecretManagerInRM extends BaseNMTokenSecretManager {
}
public void unregisterApplicationAttempt(ApplicationAttemptId appAttemptId) {
+ this.writeLock.lock();
try {
- this.writeLock.lock();
this.appAttemptToNodeKeyMap.remove(appAttemptId);
} finally {
this.writeLock.unlock();
@@ -265,8 +265,8 @@ public class NMTokenSecretManagerInRM extends BaseNMTokenSecretManager {
* @param nodeId
*/
public void removeNodeKey(NodeId nodeId) {
+ this.writeLock.lock();
try {
- this.writeLock.lock();
Iterator<HashSet<NodeId>> appNodeKeySetIterator =
this.appAttemptToNodeKeyMap.values().iterator();
while (appNodeKeySetIterator.hasNext()) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java
index 70bb66e..186742b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java
@@ -124,8 +124,8 @@ public class VolumeImpl implements Volume {
@Override
public VolumeState getVolumeState() {
+ readLock.lock();
try {
- readLock.lock();
return stateMachine.getCurrentState();
} finally {
readLock.unlock();
@@ -134,8 +134,8 @@ public class VolumeImpl implements Volume {
@Override
public VolumeId getVolumeId() {
+ readLock.lock();
try {
- readLock.lock();
return this.volumeId;
} finally {
readLock.unlock();
@@ -184,8 +184,8 @@ public class VolumeImpl implements Volume {
@Override
public void handle(VolumeEvent event) {
+ this.writeLock.lock();
try {
- this.writeLock.lock();
VolumeId volumeId = event.getVolumeId();
if (volumeId == null) {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org