You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2014/10/16 03:33:57 UTC
[4/4] git commit: YARN-2496. Enhanced Capacity Scheduler to have
basic support for allocating resources based on node-labels. Contributed by
Wangda Tan. YARN-2500. Ehnaced ResourceManager to support schedulers
allocating resources based on node-labels. C
YARN-2496. Enhanced Capacity Scheduler to have basic support for allocating resources based on node-labels. Contributed by Wangda Tan.
YARN-2500. Ehnaced ResourceManager to support schedulers allocating resources based on node-labels. Contributed by Wangda Tan.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f2ea555a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f2ea555a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f2ea555a
Branch: refs/heads/trunk
Commit: f2ea555ac6c06a3f2f6559731f48711fff05d3f1
Parents: 466f087
Author: Vinod Kumar Vavilapalli <vi...@apache.org>
Authored: Wed Oct 15 18:33:06 2014 -0700
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Wed Oct 15 18:33:06 2014 -0700
----------------------------------------------------------------------
.../hadoop/yarn/sls/nodemanager/NodeInfo.java | 5 +
.../yarn/sls/scheduler/RMNodeWrapper.java | 5 +
hadoop-yarn-project/CHANGES.txt | 6 +
.../dev-support/findbugs-exclude.xml | 17 +
.../ApplicationMasterService.java | 42 +-
.../server/resourcemanager/RMAppManager.java | 32 +-
.../yarn/server/resourcemanager/RMContext.java | 5 +
.../server/resourcemanager/RMContextImpl.java | 12 +
.../server/resourcemanager/RMServerUtils.java | 19 +-
.../server/resourcemanager/ResourceManager.java | 11 +-
.../CapacitySchedulerPlanFollower.java | 17 +-
.../server/resourcemanager/rmapp/RMAppImpl.java | 8 +-
.../rmapp/attempt/RMAppAttemptImpl.java | 39 +-
.../server/resourcemanager/rmnode/RMNode.java | 8 +
.../resourcemanager/rmnode/RMNodeImpl.java | 8 +
.../server/resourcemanager/scheduler/Queue.java | 19 +
.../scheduler/SchedulerUtils.java | 122 +++-
.../scheduler/capacity/AbstractCSQueue.java | 448 ++++++++++++++
.../scheduler/capacity/CSQueue.java | 61 +-
.../scheduler/capacity/CSQueueUtils.java | 57 +-
.../scheduler/capacity/CapacityScheduler.java | 48 +-
.../CapacitySchedulerConfiguration.java | 148 ++++-
.../scheduler/capacity/LeafQueue.java | 596 ++++++++++---------
.../scheduler/capacity/ParentQueue.java | 391 +++++-------
.../scheduler/capacity/PlanQueue.java | 8 +-
.../scheduler/capacity/ReservationQueue.java | 2 +-
.../resourcemanager/scheduler/fair/FSQueue.java | 13 +
.../scheduler/fifo/FifoScheduler.java | 13 +
.../server/resourcemanager/Application.java | 1 +
.../yarn/server/resourcemanager/MockAM.java | 35 +-
.../yarn/server/resourcemanager/MockNodes.java | 7 +-
.../yarn/server/resourcemanager/MockRM.java | 37 +-
.../server/resourcemanager/RMHATestBase.java | 5 +-
.../server/resourcemanager/TestAppManager.java | 4 +-
.../resourcemanager/TestApplicationACLs.java | 3 +-
.../resourcemanager/TestClientRMService.java | 54 +-
.../yarn/server/resourcemanager/TestRMHA.java | 2 +-
.../TestWorkPreservingRMRestart.java | 2 +-
.../reservation/ReservationSystemTestUtil.java | 30 +
.../rmapp/TestRMAppTransitions.java | 11 +-
.../attempt/TestRMAppAttemptTransitions.java | 66 +-
.../scheduler/TestSchedulerUtils.java | 279 +++++++--
.../capacity/TestApplicationLimits.java | 11 +-
.../scheduler/capacity/TestCSQueueUtils.java | 28 +-
.../capacity/TestCapacityScheduler.java | 28 +-
.../scheduler/capacity/TestChildQueueOrder.java | 5 +-
.../capacity/TestContainerAllocation.java | 460 ++++++++++++++
.../scheduler/capacity/TestLeafQueue.java | 37 +-
.../scheduler/capacity/TestParentQueue.java | 5 +-
.../scheduler/capacity/TestQueueMappings.java | 10 +-
.../scheduler/capacity/TestQueueParsing.java | 267 ++++++++-
.../capacity/TestReservationQueue.java | 9 +-
.../scheduler/capacity/TestReservations.java | 30 +-
.../scheduler/capacity/TestUtils.java | 31 +-
.../scheduler/fair/FairSchedulerTestBase.java | 2 +-
.../scheduler/fair/TestFairScheduler.java | 2 +-
.../resourcemanager/webapp/TestRMWebApp.java | 42 +-
57 files changed, 2867 insertions(+), 796 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
index 029fa87..fdddcf4 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.sls.nodemanager;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -159,6 +160,10 @@ public class NodeInfo {
return null;
}
+ @Override
+ public Set<String> getNodeLabels() {
+ return null;
+ }
}
public static RMNode newNodeInfo(String rackName, String hostName,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
index 7eca66f..3b185ae 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode
import java.util.Collections;
import java.util.List;
+import java.util.Set;
@Private
@Unstable
@@ -147,4 +148,8 @@ public class RMNodeWrapper implements RMNode {
return node.getNodeManagerVersion();
}
+ @Override
+ public Set<String> getNodeLabels() {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 326b554..97fea49 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -165,6 +165,12 @@ Release 2.6.0 - UNRELEASED
YARN-2656. Made RM web services authentication filter support proxy user.
(Varun Vasudev and Zhijie Shen via zjshen)
+ YARN-2496. Enhanced Capacity Scheduler to have basic support for allocating
+ resources based on node-labels. (Wangda Tan via vinodkv)
+
+ YARN-2500. Ehnaced ResourceManager to support schedulers allocating resources
+ based on node-labels. (Wangda Tan via vinodkv)
+
IMPROVEMENTS
YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 6e82af0..e6da24c 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -188,6 +188,23 @@
</Or>
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
+ <Match>
+ <Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue" />
+ <Or>
+ <Field name="absoluteCapacity" />
+ <Field name="absoluteMaxCapacity" />
+ <Field name="acls" />
+ <Field name="capacity" />
+ <Field name="maximumCapacity" />
+ <Field name="state" />
+ <Field name="labelManager" />
+ <Field name="defaultLabelExpression" />
+ <Field name="accessibleLabels" />
+ <Field name="absoluteNodeLabelCapacities" />
+ <Field name="reservationsContinueLooking" />
+ </Or>
+ <Bug pattern="IS2_INCONSISTENT_SYNC" />
+ </Match>
<!-- Inconsistent sync warning - scheduleAsynchronously is only initialized once and never changed -->
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler" />
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index 707cf1b..35baa44 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NMToken;
@@ -254,13 +255,13 @@ public class ApplicationMasterService extends AbstractService implements
if (hasApplicationMasterRegistered(applicationAttemptId)) {
String message =
"Application Master is already registered : "
- + applicationAttemptId.getApplicationId();
+ + appID;
LOG.warn(message);
RMAuditLogger.logFailure(
this.rmContext.getRMApps()
- .get(applicationAttemptId.getApplicationId()).getUser(),
+ .get(appID).getUser(),
AuditConstants.REGISTER_AM, "", "ApplicationMasterService", message,
- applicationAttemptId.getApplicationId(), applicationAttemptId);
+ appID, applicationAttemptId);
throw new InvalidApplicationMasterRequestException(message);
}
@@ -340,6 +341,7 @@ public class ApplicationMasterService extends AbstractService implements
ApplicationAttemptId applicationAttemptId =
authorizeRequest().getApplicationAttemptId();
+ ApplicationId appId = applicationAttemptId.getApplicationId();
AllocateResponseLock lock = responseMap.get(applicationAttemptId);
if (lock == null) {
@@ -351,13 +353,13 @@ public class ApplicationMasterService extends AbstractService implements
if (!hasApplicationMasterRegistered(applicationAttemptId)) {
String message =
"Application Master is trying to unregister before registering for: "
- + applicationAttemptId.getApplicationId();
+ + appId;
LOG.error(message);
RMAuditLogger.logFailure(
this.rmContext.getRMApps()
- .get(applicationAttemptId.getApplicationId()).getUser(),
+ .get(appId).getUser(),
AuditConstants.UNREGISTER_AM, "", "ApplicationMasterService",
- message, applicationAttemptId.getApplicationId(),
+ message, appId,
applicationAttemptId);
throw new ApplicationMasterNotRegisteredException(message);
}
@@ -365,7 +367,7 @@ public class ApplicationMasterService extends AbstractService implements
this.amLivelinessMonitor.receivedPing(applicationAttemptId);
RMApp rmApp =
- rmContext.getRMApps().get(applicationAttemptId.getApplicationId());
+ rmContext.getRMApps().get(appId);
if (rmApp.isAppFinalStateStored()) {
return FinishApplicationMasterResponse.newInstance(true);
@@ -418,6 +420,7 @@ public class ApplicationMasterService extends AbstractService implements
ApplicationAttemptId appAttemptId =
amrmTokenIdentifier.getApplicationAttemptId();
+ ApplicationId applicationId = appAttemptId.getApplicationId();
this.amLivelinessMonitor.receivedPing(appAttemptId);
@@ -432,14 +435,14 @@ public class ApplicationMasterService extends AbstractService implements
if (!hasApplicationMasterRegistered(appAttemptId)) {
String message =
"Application Master is not registered for known application: "
- + appAttemptId.getApplicationId()
+ + applicationId
+ ". Let AM resync.";
LOG.info(message);
RMAuditLogger.logFailure(
- this.rmContext.getRMApps().get(appAttemptId.getApplicationId())
+ this.rmContext.getRMApps().get(applicationId)
.getUser(), AuditConstants.REGISTER_AM, "",
"ApplicationMasterService", message,
- appAttemptId.getApplicationId(),
+ applicationId,
appAttemptId);
return resync;
}
@@ -481,11 +484,22 @@ public class ApplicationMasterService extends AbstractService implements
List<String> blacklistRemovals =
(blacklistRequest != null) ?
blacklistRequest.getBlacklistRemovals() : Collections.EMPTY_LIST;
-
+ RMApp app =
+ this.rmContext.getRMApps().get(applicationId);
+
+ // set label expression for Resource Requests
+ ApplicationSubmissionContext asc = app.getApplicationSubmissionContext();
+ for (ResourceRequest req : ask) {
+ if (null == req.getNodeLabelExpression()) {
+ req.setNodeLabelExpression(asc.getNodeLabelExpression());
+ }
+ }
+
// sanity check
try {
RMServerUtils.validateResourceRequests(ask,
- rScheduler.getMaximumResourceCapability());
+ rScheduler.getMaximumResourceCapability(), app.getQueue(),
+ rScheduler);
} catch (InvalidResourceRequestException e) {
LOG.warn("Invalid resource ask by application " + appAttemptId, e);
throw e;
@@ -498,8 +512,6 @@ public class ApplicationMasterService extends AbstractService implements
throw e;
}
- RMApp app =
- this.rmContext.getRMApps().get(appAttemptId.getApplicationId());
// In the case of work-preserving AM restart, it's possible for the
// AM to release containers from the earlier attempt.
if (!app.getApplicationSubmissionContext()
@@ -582,7 +594,7 @@ public class ApplicationMasterService extends AbstractService implements
.toString(), amrmToken.getPassword(), amrmToken.getService()
.toString()));
LOG.info("The AMRMToken has been rolled-over. Send new AMRMToken back"
- + " to application: " + appAttemptId.getApplicationId());
+ + " to application: " + applicationId);
}
/*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index 1d672e5..6e1b925 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -343,7 +343,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
long submitTime, String user)
throws YarnException {
ApplicationId applicationId = submissionContext.getApplicationId();
- validateResourceRequest(submissionContext);
+ ResourceRequest amReq = validateAndCreateResourceRequest(submissionContext);
// Create RMApp
RMAppImpl application =
new RMAppImpl(applicationId, rmContext, this.conf,
@@ -351,7 +351,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
submissionContext.getQueue(),
submissionContext, this.scheduler, this.masterService,
submitTime, submissionContext.getApplicationType(),
- submissionContext.getApplicationTags());
+ submissionContext.getApplicationTags(), amReq);
// Concurrent app submissions with same applicationId will fail here
// Concurrent app submissions with different applicationIds will not
@@ -373,7 +373,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
return application;
}
- private void validateResourceRequest(
+ private ResourceRequest validateAndCreateResourceRequest(
ApplicationSubmissionContext submissionContext)
throws InvalidResourceRequestException {
// Validation of the ApplicationSubmissionContext needs to be completed
@@ -383,18 +383,36 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
// Check whether AM resource requirements are within required limits
if (!submissionContext.getUnmanagedAM()) {
- ResourceRequest amReq = BuilderUtils.newResourceRequest(
- RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
- submissionContext.getResource(), 1);
+ ResourceRequest amReq;
+ if (submissionContext.getAMContainerResourceRequest() != null) {
+ amReq = submissionContext.getAMContainerResourceRequest();
+ } else {
+ amReq =
+ BuilderUtils.newResourceRequest(
+ RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
+ submissionContext.getResource(), 1);
+ }
+
+ // set label expression for AM container
+ if (null == amReq.getNodeLabelExpression()) {
+ amReq.setNodeLabelExpression(submissionContext
+ .getNodeLabelExpression());
+ }
+
try {
SchedulerUtils.validateResourceRequest(amReq,
- scheduler.getMaximumResourceCapability());
+ scheduler.getMaximumResourceCapability(),
+ submissionContext.getQueue(), scheduler);
} catch (InvalidResourceRequestException e) {
LOG.warn("RM app submission failed in validating AM resource request"
+ " for application " + submissionContext.getApplicationId(), e);
throw e;
}
+
+ return amReq;
}
+
+ return null;
}
private boolean isApplicationInFinalState(RMAppState rmAppState) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index a59965f..e824634 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.conf.ConfigurationProvider;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -108,6 +109,10 @@ public interface RMContext {
boolean isWorkPreservingRecoveryEnabled();
+ RMNodeLabelsManager getNodeLabelManager();
+
+ public void setNodeLabelManager(RMNodeLabelsManager mgr);
+
long getEpoch();
ReservationSystem getReservationSystem();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index 78787ee..076c3dd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
@@ -91,6 +92,7 @@ public class RMContextImpl implements RMContext {
private RMApplicationHistoryWriter rmApplicationHistoryWriter;
private SystemMetricsPublisher systemMetricsPublisher;
private ConfigurationProvider configurationProvider;
+ private RMNodeLabelsManager nodeLabelManager;
private long epoch;
private Clock systemClock = new SystemClock();
private long schedulerRecoveryStartTime = 0;
@@ -406,6 +408,16 @@ public class RMContextImpl implements RMContext {
this.epoch = epoch;
}
+ @Override
+ public RMNodeLabelsManager getNodeLabelManager() {
+ return nodeLabelManager;
+ }
+
+ @Override
+ public void setNodeLabelManager(RMNodeLabelsManager mgr) {
+ nodeLabelManager = mgr;
+ }
+
public void setSchedulerRecoveryStartAndWaitTime(long waitTime) {
this.schedulerRecoveryStartTime = systemClock.getTime();
this.schedulerRecoveryWaitTime = waitTime;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
index 29c5953..40d86e5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -84,9 +85,11 @@ public class RMServerUtils {
* requested memory/vcore is non-negative and not greater than max
*/
public static void validateResourceRequests(List<ResourceRequest> ask,
- Resource maximumResource) throws InvalidResourceRequestException {
+ Resource maximumResource, String queueName, YarnScheduler scheduler)
+ throws InvalidResourceRequestException {
for (ResourceRequest resReq : ask) {
- SchedulerUtils.validateResourceRequest(resReq, maximumResource);
+ SchedulerUtils.validateResourceRequest(resReq, maximumResource,
+ queueName, scheduler);
}
}
@@ -132,17 +135,25 @@ public class RMServerUtils {
}
}
+ public static UserGroupInformation verifyAccess(
+ AccessControlList acl, String method, final Log LOG)
+ throws IOException {
+ // by default, this method will use AdminService as module name
+ return verifyAccess(acl, method, "AdminService", LOG);
+ }
+
/**
* Utility method to verify if the current user has access based on the
* passed {@link AccessControlList}
* @param acl the {@link AccessControlList} to check against
* @param method the method name to be logged
+ * @param module, like AdminService or NodeLabelManager
* @param LOG the logger to use
* @return {@link UserGroupInformation} of the current user
* @throws IOException
*/
public static UserGroupInformation verifyAccess(
- AccessControlList acl, String method, final Log LOG)
+ AccessControlList acl, String method, String module, final Log LOG)
throws IOException {
UserGroupInformation user;
try {
@@ -159,7 +170,7 @@ public class RMServerUtils {
" to call '" + method + "'");
RMAuditLogger.logFailure(user.getShortUserName(), method,
- acl.toString(), "AdminService",
+ acl.toString(), module,
RMAuditLogger.AuditConstants.UNAUTHORIZED_USER);
throw new AccessControlException("User " + user.getShortUserName() +
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index ab45020..68cbc7c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMaste
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
@@ -320,6 +321,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
return new AMLivelinessMonitor(this.rmDispatcher);
}
+ protected RMNodeLabelsManager createNodeLabelManager() {
+ return new RMNodeLabelsManager();
+ }
+
protected DelegationTokenRenewer createDelegationTokenRenewer() {
return new DelegationTokenRenewer();
}
@@ -399,6 +404,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor();
addService(amFinishingMonitor);
rmContext.setAMFinishingMonitor(amFinishingMonitor);
+
+ RMNodeLabelsManager nlm = createNodeLabelManager();
+ addService(nlm);
+ rmContext.setNodeLabelManager(nlm);
boolean isRecoveryEnabled = conf.getBoolean(
YarnConfiguration.RECOVERY_ENABLED,
@@ -962,7 +971,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
* instance of {@link RMActiveServices} and initializes it.
* @throws Exception
*/
- void createAndInitActiveServices() throws Exception {
+ protected void createAndInitActiveServices() throws Exception {
activeServices = new RMActiveServices();
activeServices.init(conf);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java
index 0c0fbc0..126560a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -126,14 +127,18 @@ public class CapacitySchedulerPlanFollower implements PlanFollower {
// create the default reservation queue if it doesnt exist
String defReservationQueue = planQueueName + PlanQueue.DEFAULT_QUEUE_SUFFIX;
if (scheduler.getQueue(defReservationQueue) == null) {
- ReservationQueue defQueue =
- new ReservationQueue(scheduler, defReservationQueue, planQueue);
try {
+ ReservationQueue defQueue =
+ new ReservationQueue(scheduler, defReservationQueue, planQueue);
scheduler.addQueue(defQueue);
} catch (SchedulerDynamicEditException e) {
LOG.warn(
"Exception while trying to create default reservation queue for plan: {}",
planQueueName, e);
+ } catch (IOException e) {
+ LOG.warn(
+ "Exception while trying to create default reservation queue for plan: {}",
+ planQueueName, e);
}
}
curReservationNames.add(defReservationQueue);
@@ -186,14 +191,18 @@ public class CapacitySchedulerPlanFollower implements PlanFollower {
for (ReservationAllocation res : sortedAllocations) {
String currResId = res.getReservationId().toString();
if (curReservationNames.contains(currResId)) {
- ReservationQueue resQueue =
- new ReservationQueue(scheduler, currResId, planQueue);
try {
+ ReservationQueue resQueue =
+ new ReservationQueue(scheduler, currResId, planQueue);
scheduler.addQueue(resQueue);
} catch (SchedulerDynamicEditException e) {
LOG.warn(
"Exception while trying to activate reservation: {} for plan: {}",
currResId, planQueueName, e);
+ } catch (IOException e) {
+ LOG.warn(
+ "Exception while trying to activate reservation: {} for plan: {}",
+ currResId, planQueueName, e);
}
}
Resource capToAssign = res.getResourcesAtTime(now);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index c0681aa..1994b36 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
@@ -143,6 +144,7 @@ public class RMAppImpl implements RMApp, Recoverable {
private RMAppEvent eventCausingFinalSaving;
private RMAppState targetedFinalState;
private RMAppState recoveredFinalState;
+ private ResourceRequest amReq;
Object transitionTodo;
@@ -342,7 +344,8 @@ public class RMAppImpl implements RMApp, Recoverable {
Configuration config, String name, String user, String queue,
ApplicationSubmissionContext submissionContext, YarnScheduler scheduler,
ApplicationMasterService masterService, long submitTime,
- String applicationType, Set<String> applicationTags) {
+ String applicationType, Set<String> applicationTags,
+ ResourceRequest amReq) {
this.systemClock = new SystemClock();
@@ -361,6 +364,7 @@ public class RMAppImpl implements RMApp, Recoverable {
this.startTime = this.systemClock.getTime();
this.applicationType = applicationType;
this.applicationTags = applicationTags;
+ this.amReq = amReq;
int globalMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
@@ -732,7 +736,7 @@ public class RMAppImpl implements RMApp, Recoverable {
// previously failed attempts(which should not include Preempted,
// hardware error and NM resync) + 1) equal to the max-attempt
// limit.
- maxAppAttempts == (getNumFailedAppAttempts() + 1));
+ maxAppAttempts == (getNumFailedAppAttempts() + 1), amReq);
attempts.put(appAttemptId, attempt);
currentAttempt = attempt;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index fbcb7d7..b5a6237 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -93,7 +93,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
@@ -177,6 +176,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
private Object transitionTodo;
private RMAppAttemptMetrics attemptMetrics = null;
+ private ResourceRequest amReq = null;
private static final StateMachineFactory<RMAppAttemptImpl,
RMAppAttemptState,
@@ -426,7 +426,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
RMContext rmContext, YarnScheduler scheduler,
ApplicationMasterService masterService,
ApplicationSubmissionContext submissionContext,
- Configuration conf, boolean maybeLastAttempt) {
+ Configuration conf, boolean maybeLastAttempt, ResourceRequest amReq) {
this.conf = conf;
this.applicationAttemptId = appAttemptId;
this.rmContext = rmContext;
@@ -442,8 +442,11 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
this.proxiedTrackingUrl = generateProxyUriWithScheme(null);
this.maybeLastAttempt = maybeLastAttempt;
this.stateMachine = stateMachineFactory.make(this);
+
this.attemptMetrics =
new RMAppAttemptMetrics(applicationAttemptId, rmContext);
+
+ this.amReq = amReq;
}
@Override
@@ -885,24 +888,34 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
private static final List<ResourceRequest> EMPTY_CONTAINER_REQUEST_LIST =
new ArrayList<ResourceRequest>();
- private static final class ScheduleTransition
+ @VisibleForTesting
+ public static final class ScheduleTransition
implements
MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
@Override
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
- if (!appAttempt.submissionContext.getUnmanagedAM()) {
- // Request a container for the AM.
- ResourceRequest request =
- BuilderUtils.newResourceRequest(
- AM_CONTAINER_PRIORITY, ResourceRequest.ANY, appAttempt
- .getSubmissionContext().getResource(), 1);
-
+ ApplicationSubmissionContext subCtx = appAttempt.submissionContext;
+ if (!subCtx.getUnmanagedAM()) {
+ // Need reset #containers before create new attempt, because this request
+ // will be passed to scheduler, and scheduler will deduct the number after
+ // AM container allocated
+
+ // Currently, following fields are all hard code,
+ // TODO: change these fields when we want to support
+ // priority/resource-name/relax-locality specification for AM containers
+ // allocation.
+ appAttempt.amReq.setNumContainers(1);
+ appAttempt.amReq.setPriority(AM_CONTAINER_PRIORITY);
+ appAttempt.amReq.setResourceName(ResourceRequest.ANY);
+ appAttempt.amReq.setRelaxLocality(true);
+
// SchedulerUtils.validateResourceRequests is not necessary because
// AM resource has been checked when submission
- Allocation amContainerAllocation = appAttempt.scheduler.allocate(
- appAttempt.applicationAttemptId,
- Collections.singletonList(request), EMPTY_CONTAINER_RELEASE_LIST, null, null);
+ Allocation amContainerAllocation =
+ appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
+ Collections.singletonList(appAttempt.amReq),
+ EMPTY_CONTAINER_RELEASE_LIST, null, null);
if (amContainerAllocation != null
&& amContainerAllocation.getContainers() != null) {
assert (amContainerAllocation.getContainers().size() == 0);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
index a423ea5..afbcbc7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
import java.util.List;
+import java.util.Set;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -135,4 +136,11 @@ public interface RMNode {
* @return containerUpdates accumulated across NM heartbeats.
*/
public List<UpdatedContainerInfo> pullContainerUpdates();
+
+ /**
+ * Get set of labels in this node
+ *
+ * @return labels in this node
+ */
+ public Set<String> getNodeLabels();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index c960b50..13d60ab 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -855,4 +855,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
public Set<ContainerId> getLaunchedContainers() {
return this.launchedContainers;
}
+
+ @Override
+ public Set<String> getNodeLabels() {
+ if (context.getNodeLabelManager() == null) {
+ return null;
+ }
+ return context.getNodeLabelManager().getLabelsOnNode(nodeId);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
index 0bc8ca1..4663a91 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.List;
+import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
@@ -71,4 +72,22 @@ public interface Queue {
*/
public void recoverContainer(Resource clusterResource,
SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer);
+
+ /**
+ * Get labels can be accessed of this queue
+ * labels={*}, means this queue can access any label
+ * labels={ }, means this queue cannot access any label except node without label
+ * labels={a, b, c} means this queue can access a or b or c
+ * @return labels
+ */
+ public Set<String> getAccessibleNodeLabels();
+
+ /**
+ * Get default label expression of this queue. If label expression of
+ * ApplicationSubmissionContext and label expression of Resource Request not
+ * set, this will be used.
+ *
+ * @return default label expression
+ */
+ public String getDefaultNodeLabelExpression();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
index ac37c2f..5d00009 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
@@ -17,23 +17,29 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+import java.io.IOException;
import java.util.List;
+import java.util.Set;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
+import com.google.common.collect.Sets;
+
/**
* Utilities shared by schedulers.
*/
@@ -190,7 +196,8 @@ public class SchedulerUtils {
* request
*/
public static void validateResourceRequest(ResourceRequest resReq,
- Resource maximumResource) throws InvalidResourceRequestException {
+ Resource maximumResource, String queueName, YarnScheduler scheduler)
+ throws InvalidResourceRequestException {
if (resReq.getCapability().getMemory() < 0 ||
resReq.getCapability().getMemory() > maximumResource.getMemory()) {
throw new InvalidResourceRequestException("Invalid resource request"
@@ -209,5 +216,116 @@ public class SchedulerUtils {
+ resReq.getCapability().getVirtualCores()
+ ", maxVirtualCores=" + maximumResource.getVirtualCores());
}
+
+ // Get queue from scheduler
+ QueueInfo queueInfo = null;
+ try {
+ queueInfo = scheduler.getQueueInfo(queueName, false, false);
+ } catch (IOException e) {
+ // it is possible queue cannot get when queue mapping is set, just ignore
+ // the queueInfo here, and move forward
+ }
+
+ // check labels in the resource request.
+ String labelExp = resReq.getNodeLabelExpression();
+
+ // if queue has default label expression, and RR doesn't have, use the
+ // default label expression of queue
+ if (labelExp == null && queueInfo != null) {
+ labelExp = queueInfo.getDefaultNodeLabelExpression();
+ resReq.setNodeLabelExpression(labelExp);
+ }
+
+ if (labelExp != null && !labelExp.trim().isEmpty() && queueInfo != null) {
+ if (!checkQueueLabelExpression(queueInfo.getAccessibleNodeLabels(),
+ labelExp)) {
+ throw new InvalidResourceRequestException("Invalid resource request"
+ + ", queue="
+ + queueInfo.getQueueName()
+ + " doesn't have permission to access all labels "
+ + "in resource request. labelExpression of resource request="
+ + labelExp
+ + ". Queue labels="
+ + (queueInfo.getAccessibleNodeLabels() == null ? "" : StringUtils.join(queueInfo
+ .getAccessibleNodeLabels().iterator(), ',')));
+ }
+ }
+ }
+
+ public static boolean checkQueueAccessToNode(Set<String> queueLabels,
+ Set<String> nodeLabels) {
+ // if queue's label is *, it can access any node
+ if (queueLabels != null && queueLabels.contains(RMNodeLabelsManager.ANY)) {
+ return true;
+ }
+ // any queue can access to a node without label
+ if (nodeLabels == null || nodeLabels.isEmpty()) {
+ return true;
+ }
+ // a queue can access to a node only if it contains any label of the node
+ if (queueLabels != null
+ && Sets.intersection(queueLabels, nodeLabels).size() > 0) {
+ return true;
+ }
+ // sorry, you cannot access
+ return false;
+ }
+
+ public static void checkIfLabelInClusterNodeLabels(RMNodeLabelsManager mgr,
+ Set<String> labels) throws IOException {
+ if (mgr == null) {
+ if (labels != null && !labels.isEmpty()) {
+ throw new IOException("NodeLabelManager is null, please check");
+ }
+ return;
+ }
+
+ if (labels != null) {
+ for (String label : labels) {
+ if (!label.equals(RMNodeLabelsManager.ANY)
+ && !mgr.containsNodeLabel(label)) {
+ throw new IOException("NodeLabelManager doesn't include label = "
+ + label + ", please check.");
+ }
+ }
+ }
+ }
+
+ public static boolean checkNodeLabelExpression(Set<String> nodeLabels,
+ String labelExpression) {
+ // empty label expression can only allocate on node with empty labels
+ if (labelExpression == null || labelExpression.trim().isEmpty()) {
+ if (!nodeLabels.isEmpty()) {
+ return false;
+ }
+ }
+
+ if (labelExpression != null) {
+ for (String str : labelExpression.split("&&")) {
+ if (!str.trim().isEmpty()
+ && (nodeLabels == null || !nodeLabels.contains(str.trim()))) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ public static boolean checkQueueLabelExpression(Set<String> queueLabels,
+ String labelExpression) {
+ if (queueLabels != null && queueLabels.contains(RMNodeLabelsManager.ANY)) {
+ return true;
+ }
+ // if label expression is empty, we can allocate container on any node
+ if (labelExpression == null) {
+ return true;
+ }
+ for (String str : labelExpression.split("&&")) {
+ if (!str.trim().isEmpty()
+ && (queueLabels == null || !queueLabels.contains(str.trim()))) {
+ return false;
+ }
+ }
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
new file mode 100644
index 0000000..7159e4d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -0,0 +1,448 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.api.records.QueueACL;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.QueueState;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import com.google.common.collect.Sets;
+
+public abstract class AbstractCSQueue implements CSQueue {
+
+ CSQueue parent;
+ final String queueName;
+
+ float capacity;
+ float maximumCapacity;
+ float absoluteCapacity;
+ float absoluteMaxCapacity;
+ float absoluteUsedCapacity = 0.0f;
+
+ float usedCapacity = 0.0f;
+ volatile int numContainers;
+
+ final Resource minimumAllocation;
+ final Resource maximumAllocation;
+ QueueState state;
+ final QueueMetrics metrics;
+
+ final ResourceCalculator resourceCalculator;
+ Set<String> accessibleLabels;
+ RMNodeLabelsManager labelManager;
+ String defaultLabelExpression;
+ Resource usedResources = Resources.createResource(0, 0);
+ QueueInfo queueInfo;
+ Map<String, Float> absoluteCapacityByNodeLabels;
+ Map<String, Float> capacitiyByNodeLabels;
+ Map<String, Resource> usedResourcesByNodeLabels = new HashMap<String, Resource>();
+ Map<String, Float> absoluteMaxCapacityByNodeLabels;
+ Map<String, Float> maxCapacityByNodeLabels;
+
+ Map<QueueACL, AccessControlList> acls =
+ new HashMap<QueueACL, AccessControlList>();
+ boolean reservationsContinueLooking;
+
+ private final RecordFactory recordFactory =
+ RecordFactoryProvider.getRecordFactory(null);
+
+ public AbstractCSQueue(CapacitySchedulerContext cs,
+ String queueName, CSQueue parent, CSQueue old) throws IOException {
+ this.minimumAllocation = cs.getMinimumResourceCapability();
+ this.maximumAllocation = cs.getMaximumResourceCapability();
+ this.labelManager = cs.getRMContext().getNodeLabelManager();
+ this.parent = parent;
+ this.queueName = queueName;
+ this.resourceCalculator = cs.getResourceCalculator();
+ this.queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
+
+ // must be called after parent and queueName is set
+ this.metrics = old != null ? old.getMetrics() :
+ QueueMetrics.forQueue(getQueuePath(), parent,
+ cs.getConfiguration().getEnableUserMetrics(),
+ cs.getConf());
+
+ // get labels
+ this.accessibleLabels = cs.getConfiguration().getAccessibleNodeLabels(getQueuePath());
+ this.defaultLabelExpression = cs.getConfiguration()
+ .getDefaultNodeLabelExpression(getQueuePath());
+
+ this.queueInfo.setQueueName(queueName);
+
+ // inherit from parent if labels not set
+ if (this.accessibleLabels == null && parent != null) {
+ this.accessibleLabels = parent.getAccessibleNodeLabels();
+ SchedulerUtils.checkIfLabelInClusterNodeLabels(labelManager,
+ this.accessibleLabels);
+ }
+
+ // inherit from parent if labels not set
+ if (this.defaultLabelExpression == null && parent != null
+ && this.accessibleLabels.containsAll(parent.getAccessibleNodeLabels())) {
+ this.defaultLabelExpression = parent.getDefaultNodeLabelExpression();
+ }
+
+ // set capacity by labels
+ capacitiyByNodeLabels =
+ cs.getConfiguration().getNodeLabelCapacities(getQueuePath(), accessibleLabels,
+ labelManager);
+
+ // set maximum capacity by labels
+ maxCapacityByNodeLabels =
+ cs.getConfiguration().getMaximumNodeLabelCapacities(getQueuePath(),
+ accessibleLabels, labelManager);
+ }
+
+ @Override
+ public synchronized float getCapacity() {
+ return capacity;
+ }
+
+ @Override
+ public synchronized float getAbsoluteCapacity() {
+ return absoluteCapacity;
+ }
+
+ @Override
+ public float getAbsoluteMaximumCapacity() {
+ return absoluteMaxCapacity;
+ }
+
+ @Override
+ public synchronized float getAbsoluteUsedCapacity() {
+ return absoluteUsedCapacity;
+ }
+
+ @Override
+ public float getMaximumCapacity() {
+ return maximumCapacity;
+ }
+
+ @Override
+ public synchronized float getUsedCapacity() {
+ return usedCapacity;
+ }
+
+ @Override
+ public synchronized Resource getUsedResources() {
+ return usedResources;
+ }
+
+ public synchronized int getNumContainers() {
+ return numContainers;
+ }
+
+ @Override
+ public synchronized QueueState getState() {
+ return state;
+ }
+
+ @Override
+ public QueueMetrics getMetrics() {
+ return metrics;
+ }
+
+ @Override
+ public String getQueueName() {
+ return queueName;
+ }
+
+ @Override
+ public synchronized CSQueue getParent() {
+ return parent;
+ }
+
+ @Override
+ public synchronized void setParent(CSQueue newParentQueue) {
+ this.parent = (ParentQueue)newParentQueue;
+ }
+
+ public Set<String> getAccessibleNodeLabels() {
+ return accessibleLabels;
+ }
+
+ @Override
+ public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
+ synchronized (this) {
+ if (acls.get(acl).isUserAllowed(user)) {
+ return true;
+ }
+ }
+
+ if (parent != null) {
+ return parent.hasAccess(acl, user);
+ }
+
+ return false;
+ }
+
+ @Override
+ public synchronized void setUsedCapacity(float usedCapacity) {
+ this.usedCapacity = usedCapacity;
+ }
+
+ @Override
+ public synchronized void setAbsoluteUsedCapacity(float absUsedCapacity) {
+ this.absoluteUsedCapacity = absUsedCapacity;
+ }
+
+ /**
+ * Set maximum capacity - used only for testing.
+ * @param maximumCapacity new max capacity
+ */
+ synchronized void setMaxCapacity(float maximumCapacity) {
+ // Sanity check
+ CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
+ float absMaxCapacity =
+ CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent);
+ CSQueueUtils.checkAbsoluteCapacity(getQueueName(), absoluteCapacity,
+ absMaxCapacity);
+
+ this.maximumCapacity = maximumCapacity;
+ this.absoluteMaxCapacity = absMaxCapacity;
+ }
+
+ @Override
+ public float getAbsActualCapacity() {
+ // for now, simply return actual capacity = guaranteed capacity for parent
+ // queue
+ return absoluteCapacity;
+ }
+
+ @Override
+ public String getDefaultNodeLabelExpression() {
+ return defaultLabelExpression;
+ }
+
+ synchronized void setupQueueConfigs(Resource clusterResource, float capacity,
+ float absoluteCapacity, float maximumCapacity, float absoluteMaxCapacity,
+ QueueState state, Map<QueueACL, AccessControlList> acls,
+ Set<String> labels, String defaultLabelExpression,
+ Map<String, Float> nodeLabelCapacities,
+ Map<String, Float> maximumNodeLabelCapacities,
+ boolean reservationContinueLooking)
+ throws IOException {
+ // Sanity check
+ CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
+ CSQueueUtils.checkAbsoluteCapacity(getQueueName(), absoluteCapacity,
+ absoluteMaxCapacity);
+
+ this.capacity = capacity;
+ this.absoluteCapacity = absoluteCapacity;
+
+ this.maximumCapacity = maximumCapacity;
+ this.absoluteMaxCapacity = absoluteMaxCapacity;
+
+ this.state = state;
+
+ this.acls = acls;
+
+ // set labels
+ this.accessibleLabels = labels;
+
+ // set label expression
+ this.defaultLabelExpression = defaultLabelExpression;
+
+ // copy node label capacity
+ this.capacitiyByNodeLabels = new HashMap<String, Float>(nodeLabelCapacities);
+ this.maxCapacityByNodeLabels =
+ new HashMap<String, Float>(maximumNodeLabelCapacities);
+
+ this.queueInfo.setAccessibleNodeLabels(this.accessibleLabels);
+ this.queueInfo.setCapacity(this.capacity);
+ this.queueInfo.setMaximumCapacity(this.maximumCapacity);
+ this.queueInfo.setQueueState(this.state);
+ this.queueInfo.setDefaultNodeLabelExpression(this.defaultLabelExpression);
+
+ // Update metrics
+ CSQueueUtils.updateQueueStatistics(
+ resourceCalculator, this, parent, clusterResource, minimumAllocation);
+
+ // Check if labels of this queue is a subset of parent queue, only do this
+ // when we not root
+ if (parent != null && parent.getParent() != null) {
+ if (parent.getAccessibleNodeLabels() != null
+ && !parent.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY)) {
+ // if parent isn't "*", child shouldn't be "*" too
+ if (this.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY)) {
+ throw new IOException("Parent's accessible queue is not ANY(*), "
+ + "but child's accessible queue is *");
+ } else {
+ Set<String> diff =
+ Sets.difference(this.getAccessibleNodeLabels(),
+ parent.getAccessibleNodeLabels());
+ if (!diff.isEmpty()) {
+ throw new IOException("Some labels of child queue is not a subset "
+ + "of parent queue, these labels=["
+ + StringUtils.join(diff, ",") + "]");
+ }
+ }
+ }
+ }
+
+ // calculate absolute capacity by each node label
+ this.absoluteCapacityByNodeLabels =
+ CSQueueUtils.computeAbsoluteCapacityByNodeLabels(
+ this.capacitiyByNodeLabels, parent);
+
+ // calculate maximum capacity by each node label
+ this.absoluteMaxCapacityByNodeLabels =
+ CSQueueUtils.computeAbsoluteMaxCapacityByNodeLabels(
+ maximumNodeLabelCapacities, parent);
+
+ // check absoluteMaximumNodeLabelCapacities is valid
+ CSQueueUtils.checkAbsoluteCapacitiesByLabel(getQueueName(),
+ absoluteCapacityByNodeLabels, absoluteCapacityByNodeLabels);
+
+ this.reservationsContinueLooking = reservationContinueLooking;
+ }
+
+ @Private
+ public Resource getMaximumAllocation() {
+ return maximumAllocation;
+ }
+
+ @Private
+ public Resource getMinimumAllocation() {
+ return minimumAllocation;
+ }
+
+ synchronized void allocateResource(Resource clusterResource,
+ Resource resource, Set<String> nodeLabels) {
+ Resources.addTo(usedResources, resource);
+
+ // Update usedResources by labels
+ if (nodeLabels == null || nodeLabels.isEmpty()) {
+ if (!usedResourcesByNodeLabels.containsKey(RMNodeLabelsManager.NO_LABEL)) {
+ usedResourcesByNodeLabels.put(RMNodeLabelsManager.NO_LABEL,
+ Resources.createResource(0));
+ }
+ Resources.addTo(usedResourcesByNodeLabels.get(RMNodeLabelsManager.NO_LABEL),
+ resource);
+ } else {
+ for (String label : Sets.intersection(accessibleLabels, nodeLabels)) {
+ if (!usedResourcesByNodeLabels.containsKey(label)) {
+ usedResourcesByNodeLabels.put(label, Resources.createResource(0));
+ }
+ Resources.addTo(usedResourcesByNodeLabels.get(label), resource);
+ }
+ }
+
+ ++numContainers;
+ CSQueueUtils.updateQueueStatistics(resourceCalculator, this, getParent(),
+ clusterResource, minimumAllocation);
+ }
+
+ protected synchronized void releaseResource(Resource clusterResource,
+ Resource resource, Set<String> nodeLabels) {
+ // Update queue metrics
+ Resources.subtractFrom(usedResources, resource);
+
+ // Update usedResources by labels
+ if (null == nodeLabels || nodeLabels.isEmpty()) {
+ if (!usedResourcesByNodeLabels.containsKey(RMNodeLabelsManager.NO_LABEL)) {
+ usedResourcesByNodeLabels.put(RMNodeLabelsManager.NO_LABEL,
+ Resources.createResource(0));
+ }
+ Resources.subtractFrom(
+ usedResourcesByNodeLabels.get(RMNodeLabelsManager.NO_LABEL), resource);
+ } else {
+ for (String label : Sets.intersection(accessibleLabels, nodeLabels)) {
+ if (!usedResourcesByNodeLabels.containsKey(label)) {
+ usedResourcesByNodeLabels.put(label, Resources.createResource(0));
+ }
+ Resources.subtractFrom(usedResourcesByNodeLabels.get(label), resource);
+ }
+ }
+
+ CSQueueUtils.updateQueueStatistics(resourceCalculator, this, getParent(),
+ clusterResource, minimumAllocation);
+ --numContainers;
+ }
+
+ @Private
+ public float getCapacityByNodeLabel(String label) {
+ if (null == parent) {
+ return 1f;
+ }
+
+ if (StringUtils.equals(label, RMNodeLabelsManager.NO_LABEL)) {
+ return getCapacity();
+ }
+
+ if (!capacitiyByNodeLabels.containsKey(label)) {
+ return 0;
+ } else {
+ return capacitiyByNodeLabels.get(label);
+ }
+ }
+
+ @Private
+ public float getAbsoluteCapacityByNodeLabel(String label) {
+ if (null == parent) {
+ return 1;
+ }
+
+ if (StringUtils.equals(label, RMNodeLabelsManager.NO_LABEL)) {
+ return getAbsoluteCapacity();
+ }
+
+ if (!absoluteMaxCapacityByNodeLabels.containsKey(label)) {
+ return 0;
+ } else {
+ return absoluteMaxCapacityByNodeLabels.get(label);
+ }
+ }
+
+ @Private
+ public float getAbsoluteMaximumCapacityByNodeLabel(String label) {
+ if (StringUtils.equals(label, RMNodeLabelsManager.NO_LABEL)) {
+ return getAbsoluteMaximumCapacity();
+ }
+
+ return getAbsoluteCapacityByNodeLabel(label);
+ }
+
+ @Private
+ public boolean getReservationContinueLooking() {
+ return reservationsContinueLooking;
+ }
+
+ @Private
+ public Map<QueueACL, AccessControlList> getACLs() {
+ return acls;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index db893dc..6438d6c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -72,9 +72,18 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
/**
* Get the configured <em>capacity</em> of the queue.
- * @return queue capacity
+ * @return configured queue capacity
*/
public float getCapacity();
+
+ /**
+ * Get actual <em>capacity</em> of the queue, this may be different from
+ * configured capacity when mis-config take place, like add labels to the
+ * cluster
+ *
+ * @return actual queue capacity
+ */
+ public float getAbsActualCapacity();
/**
* Get capacity of the parent of the queue as a function of the
@@ -106,28 +115,31 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
public float getAbsoluteUsedCapacity();
/**
- * Get the current used capacity of the queue
- * and it's children (if any).
- * @return queue used capacity
- */
- public float getUsedCapacity();
-
- /**
* Set used capacity of the queue.
- * @param usedCapacity used capacity of the queue
+ * @param usedCapacity
+ * used capacity of the queue
*/
public void setUsedCapacity(float usedCapacity);
-
+
/**
* Set absolute used capacity of the queue.
- * @param absUsedCapacity absolute used capacity of the queue
+ * @param absUsedCapacity
+ * absolute used capacity of the queue
*/
public void setAbsoluteUsedCapacity(float absUsedCapacity);
/**
- * Get the currently utilized resources in the cluster
- * by the queue and children (if any).
- * @return used resources by the queue and it's children
+ * Get the current used capacity of nodes without label(s) of the queue
+ * and it's children (if any).
+ * @return queue used capacity
+ */
+ public float getUsedCapacity();
+
+ /**
+ * Get the currently utilized resources which allocated at nodes without any
+ * labels in the cluster by the queue and children (if any).
+ *
+ * @return used resources by the queue and it's children
*/
public Resource getUsedResources();
@@ -259,4 +271,25 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
*/
public void attachContainer(Resource clusterResource,
FiCaSchedulerApp application, RMContainer container);
+
+ /**
+ * Get absolute capacity by label of this queue can use
+ * @param nodeLabel
+ * @return absolute capacity by label of this queue can use
+ */
+ public float getAbsoluteCapacityByNodeLabel(String nodeLabel);
+
+ /**
+ * Get absolute max capacity by label of this queue can use
+ * @param nodeLabel
+ * @return absolute capacity by label of this queue can use
+ */
+ public float getAbsoluteMaximumCapacityByNodeLabel(String nodeLabel);
+
+ /**
+ * Get capacity by node label
+ * @param nodeLabel
+ * @return capacity by node label
+ */
+ public float getCapacityByNodeLabel(String nodeLabel);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
index 737062b..0a2fa3a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
@@ -17,9 +17,12 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -40,7 +43,7 @@ class CSQueueUtils {
}
}
- public static void checkAbsoluteCapacities(String queueName,
+ public static void checkAbsoluteCapacity(String queueName,
float absCapacity, float absMaxCapacity) {
if (absMaxCapacity < (absCapacity - EPSILON)) {
throw new IllegalArgumentException("Illegal call to setMaxCapacity. "
@@ -49,6 +52,23 @@ class CSQueueUtils {
+ ")");
}
}
+
+ public static void checkAbsoluteCapacitiesByLabel(String queueName,
+ Map<String, Float> absCapacities,
+ Map<String, Float> absMaximumCapacities) {
+ for (Entry<String, Float> entry : absCapacities.entrySet()) {
+ String label = entry.getKey();
+ float absCapacity = entry.getValue();
+ float absMaxCapacity = absMaximumCapacities.get(label);
+ if (absMaxCapacity < (absCapacity - EPSILON)) {
+ throw new IllegalArgumentException("Illegal call to setMaxCapacity. "
+ + "Queue '" + queueName + "' has " + "an absolute capacity ("
+ + absCapacity + ") greater than "
+ + "its absolute maximumCapacity (" + absMaxCapacity + ") of label="
+ + label);
+ }
+ }
+ }
public static float computeAbsoluteMaximumCapacity(
float maximumCapacity, CSQueue parent) {
@@ -56,6 +76,39 @@ class CSQueueUtils {
(parent == null) ? 1.0f : parent.getAbsoluteMaximumCapacity();
return (parentAbsMaxCapacity * maximumCapacity);
}
+
+ public static Map<String, Float> computeAbsoluteCapacityByNodeLabels(
+ Map<String, Float> nodeLabelToCapacities, CSQueue parent) {
+ if (parent == null) {
+ return nodeLabelToCapacities;
+ }
+
+ Map<String, Float> absoluteCapacityByNodeLabels =
+ new HashMap<String, Float>();
+ for (Entry<String, Float> entry : nodeLabelToCapacities.entrySet()) {
+ String label = entry.getKey();
+ float capacity = entry.getValue();
+ absoluteCapacityByNodeLabels.put(label,
+ capacity * parent.getAbsoluteCapacityByNodeLabel(label));
+ }
+ return absoluteCapacityByNodeLabels;
+ }
+
+ public static Map<String, Float> computeAbsoluteMaxCapacityByNodeLabels(
+ Map<String, Float> maximumNodeLabelToCapacities, CSQueue parent) {
+ if (parent == null) {
+ return maximumNodeLabelToCapacities;
+ }
+ Map<String, Float> absoluteMaxCapacityByNodeLabels =
+ new HashMap<String, Float>();
+ for (Entry<String, Float> entry : maximumNodeLabelToCapacities.entrySet()) {
+ String label = entry.getKey();
+ float maxCapacity = entry.getValue();
+ absoluteMaxCapacityByNodeLabels.put(label,
+ maxCapacity * parent.getAbsoluteMaximumCapacityByNodeLabel(label));
+ }
+ return absoluteMaxCapacityByNodeLabels;
+ }
public static int computeMaxActiveApplications(
ResourceCalculator calculator,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index ed5518c..9332228 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -20,7 +20,15 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import java.io.InputStream;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -53,8 +61,13 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.*;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -191,6 +204,7 @@ public class CapacityScheduler extends
private boolean scheduleAsynchronously;
private AsyncScheduleThread asyncSchedulerThread;
+ private RMNodeLabelsManager labelManager;
/**
* EXPERT
@@ -275,6 +289,8 @@ public class CapacityScheduler extends
this.applications =
new ConcurrentHashMap<ApplicationId,
SchedulerApplication<FiCaSchedulerApp>>();
+ this.labelManager = rmContext.getNodeLabelManager();
+
initializeQueues(this.conf);
scheduleAsynchronously = this.conf.getScheduleAynschronously();
@@ -446,7 +462,7 @@ public class CapacityScheduler extends
root =
parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT,
queues, queues, noop);
-
+ labelManager.reinitializeQueueLabels(getQueueToLabels());
LOG.info("Initialized root queue " + root);
initializeQueueMappings();
}
@@ -469,10 +485,19 @@ public class CapacityScheduler extends
// Re-configure queues
root.reinitialize(newRoot, clusterResource);
initializeQueueMappings();
-
+
// Re-calculate headroom for active applications
root.updateClusterResource(clusterResource);
+ labelManager.reinitializeQueueLabels(getQueueToLabels());
+ }
+
+ private Map<String, Set<String>> getQueueToLabels() {
+ Map<String, Set<String>> queueToLabels = new HashMap<String, Set<String>>();
+ for (CSQueue queue : queues.values()) {
+ queueToLabels.put(queue.getQueueName(), queue.getAccessibleNodeLabels());
+ }
+ return queueToLabels;
}
/**
@@ -515,7 +540,7 @@ public class CapacityScheduler extends
@Lock(CapacityScheduler.class)
static CSQueue parseQueue(
- CapacitySchedulerContext csContext,
+ CapacitySchedulerContext csContext,
CapacitySchedulerConfiguration conf,
CSQueue parent, String queueName, Map<String, CSQueue> queues,
Map<String, CSQueue> oldQueues,
@@ -1094,11 +1119,18 @@ public class CapacityScheduler extends
}
private synchronized void addNode(RMNode nodeManager) {
+ // update this node to node label manager
+ if (labelManager != null) {
+ labelManager.activateNode(nodeManager.getNodeID(),
+ nodeManager.getTotalCapability());
+ }
+
this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager,
usePortForNodeName));
Resources.addTo(clusterResource, nodeManager.getTotalCapability());
root.updateClusterResource(clusterResource);
int numNodes = numNodeManagers.incrementAndGet();
+
LOG.info("Added node " + nodeManager.getNodeAddress() +
" clusterResource: " + clusterResource);
@@ -1108,6 +1140,11 @@ public class CapacityScheduler extends
}
private synchronized void removeNode(RMNode nodeInfo) {
+ // update this node to node label manager
+ if (labelManager != null) {
+ labelManager.deactivateNode(nodeInfo.getNodeID());
+ }
+
FiCaSchedulerNode node = nodes.get(nodeInfo.getNodeID());
if (node == null) {
return;
@@ -1141,6 +1178,7 @@ public class CapacityScheduler extends
}
this.nodes.remove(nodeInfo.getNodeID());
+
LOG.info("Removed node " + nodeInfo.getNodeAddress() +
" clusterResource: " + clusterResource);
}