You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2021/10/27 15:14:27 UTC
[hadoop] branch trunk updated: YARN-10924. Clean up
CapacityScheduler#initScheduler (#3581) Contributed by Szilard Nemeth
This is an automated email from the ASF dual-hosted git repository.
shuzirra pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 66ac476 YARN-10924. Clean up CapacityScheduler#initScheduler (#3581) Contributed by Szilard Nemeth
66ac476 is described below
commit 66ac476b4823df351cbaab0971da0adc7d08c242
Author: Szilard Nemeth <95...@users.noreply.github.com>
AuthorDate: Wed Oct 27 17:13:49 2021 +0200
YARN-10924. Clean up CapacityScheduler#initScheduler (#3581) Contributed by Szilard Nemeth
---
.../scheduler/capacity/CapacityScheduler.java | 165 +++++++++++----------
1 file changed, 89 insertions(+), 76 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 592cb5a..b8091c7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -302,38 +302,13 @@ public class CapacityScheduler extends
IOException, YarnException {
writeLock.lock();
try {
- String confProviderStr = configuration.get(
- YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,
- YarnConfiguration.DEFAULT_CONFIGURATION_STORE);
- switch (confProviderStr) {
- case YarnConfiguration.FILE_CONFIGURATION_STORE:
- this.csConfProvider =
- new FileBasedCSConfigurationProvider(rmContext);
- break;
- case YarnConfiguration.MEMORY_CONFIGURATION_STORE:
- case YarnConfiguration.LEVELDB_CONFIGURATION_STORE:
- case YarnConfiguration.ZK_CONFIGURATION_STORE:
- case YarnConfiguration.FS_CONFIGURATION_STORE:
- this.csConfProvider = new MutableCSConfigurationProvider(rmContext);
- break;
- default:
- throw new IOException("Invalid configuration store class: " +
- confProviderStr);
- }
+ this.csConfProvider = getCsConfProvider(configuration);
this.csConfProvider.init(configuration);
this.conf = this.csConfProvider.loadConfiguration(configuration);
validateConf(this.conf);
this.minimumAllocation = super.getMinimumAllocation();
initMaximumResourceCapability(super.getMaximumAllocation());
- this.calculator = this.conf.getResourceCalculator();
- if (this.calculator instanceof DefaultResourceCalculator
- && ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
- throw new YarnRuntimeException("RM uses DefaultResourceCalculator which"
- + " used only memory as resource-type but invalid resource-types"
- + " specified " + ResourceUtils.getResourceTypes() + ". Use"
- + " DominantResourceCalculator instead to make effective use of"
- + " these resource-types");
- }
+ this.calculator = initResourceCalculator();
this.usePortForNodeName = this.conf.getUsePortForNodeName();
this.applications = new ConcurrentHashMap<>();
this.labelManager = rmContext.getNodeLabelManager();
@@ -341,71 +316,109 @@ public class CapacityScheduler extends
this.queueManager = new CapacitySchedulerQueueManager(yarnConf,
this.labelManager, this.appPriorityACLManager);
this.queueManager.setCapacitySchedulerContext(this);
-
this.workflowPriorityMappingsMgr = new WorkflowPriorityMappingsManager();
-
this.activitiesManager = new ActivitiesManager(rmContext);
activitiesManager.init(conf);
initializeQueues(this.conf);
this.isLazyPreemptionEnabled = conf.getLazyPreemptionEnabled();
-
- scheduleAsynchronously = this.conf.getScheduleAynschronously();
- asyncScheduleInterval = this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,
- DEFAULT_ASYNC_SCHEDULER_INTERVAL);
-
this.assignMultipleEnabled = this.conf.getAssignMultipleEnabled();
this.maxAssignPerHeartbeat = this.conf.getMaxAssignPerHeartbeat();
-
- this.appShouldFailFast = CapacitySchedulerConfiguration.shouldAppFailFast(
- getConfig());
-
- // number of threads for async scheduling
- int maxAsyncSchedulingThreads = this.conf.getInt(
- CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
- 1);
- maxAsyncSchedulingThreads = Math.max(maxAsyncSchedulingThreads, 1);
-
- if (scheduleAsynchronously) {
- asyncSchedulerThreads = new ArrayList<>();
- for (int i = 0; i < maxAsyncSchedulingThreads; i++) {
- asyncSchedulerThreads.add(new AsyncScheduleThread(this));
- }
- resourceCommitterService = new ResourceCommitterService(this);
- asyncMaxPendingBacklogs = this.conf.getInt(
- CapacitySchedulerConfiguration.
- SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS,
- CapacitySchedulerConfiguration.
- DEFAULT_SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS);
- }
+ this.appShouldFailFast = CapacitySchedulerConfiguration.shouldAppFailFast(getConfig());
+ initAsyncSchedulingProperties();
// Setup how many containers we can allocate for each round
offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit();
- // Register CS specific multi-node policies to common MultiNodeManager
- // which will add to a MultiNodeSorter which gives a pre-sorted list of
- // nodes to scheduler's allocation.
- multiNodePlacementEnabled = this.conf.getMultiNodePlacementEnabled();
- if(rmContext.getMultiNodeSortingManager() != null) {
- rmContext.getMultiNodeSortingManager().registerMultiNodePolicyNames(
- multiNodePlacementEnabled,
- this.conf.getMultiNodePlacementPolicies());
- }
-
- LOG.info("Initialized CapacityScheduler with " + "calculator="
- + getResourceCalculator().getClass() + ", " + "minimumAllocation="
- + getMinimumResourceCapability() + ", " + "maximumAllocation="
- + getMaximumResourceCapability() + ", " + "asynchronousScheduling="
- + scheduleAsynchronously + ", " + "asyncScheduleInterval="
- + asyncScheduleInterval + "ms" + ",multiNodePlacementEnabled="
- + multiNodePlacementEnabled + ", " + "assignMultipleEnabled="
- + assignMultipleEnabled + ", " + "maxAssignPerHeartbeat="
- + maxAssignPerHeartbeat + ", " + "offswitchPerHeartbeatLimit="
- + offswitchPerHeartbeatLimit);
+ initMultiNodePlacement();
+ printSchedulerInitialized();
} finally {
writeLock.unlock();
}
}
+ private CSConfigurationProvider getCsConfProvider(Configuration configuration)
+ throws IOException {
+ String confProviderStr = configuration.get(
+ YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,
+ YarnConfiguration.DEFAULT_CONFIGURATION_STORE);
+ switch (confProviderStr) {
+ case YarnConfiguration.FILE_CONFIGURATION_STORE:
+ return new FileBasedCSConfigurationProvider(rmContext);
+ case YarnConfiguration.MEMORY_CONFIGURATION_STORE:
+ case YarnConfiguration.LEVELDB_CONFIGURATION_STORE:
+ case YarnConfiguration.ZK_CONFIGURATION_STORE:
+ case YarnConfiguration.FS_CONFIGURATION_STORE:
+ return new MutableCSConfigurationProvider(rmContext);
+ default:
+ throw new IOException("Invalid configuration store class: " + confProviderStr);
+ }
+ }
+
+ private ResourceCalculator initResourceCalculator() {
+ ResourceCalculator resourceCalculator = this.conf.getResourceCalculator();
+ if (resourceCalculator instanceof DefaultResourceCalculator
+ && ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
+ throw new YarnRuntimeException("RM uses DefaultResourceCalculator which"
+ + " used only memory as resource-type but invalid resource-types"
+ + " specified " + ResourceUtils.getResourceTypes() + ". Use"
+ + " DominantResourceCalculator instead to make effective use of"
+ + " these resource-types");
+ }
+ return resourceCalculator;
+ }
+
+ private void initAsyncSchedulingProperties() {
+ scheduleAsynchronously = this.conf.getScheduleAynschronously();
+ asyncScheduleInterval = this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,
+ DEFAULT_ASYNC_SCHEDULER_INTERVAL);
+
+ // number of threads for async scheduling
+ int maxAsyncSchedulingThreads = this.conf.getInt(
+ CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD, 1);
+ maxAsyncSchedulingThreads = Math.max(maxAsyncSchedulingThreads, 1);
+
+ if (scheduleAsynchronously) {
+ asyncSchedulerThreads = new ArrayList<>();
+ for (int i = 0; i < maxAsyncSchedulingThreads; i++) {
+ asyncSchedulerThreads.add(new AsyncScheduleThread(this));
+ }
+ resourceCommitterService = new ResourceCommitterService(this);
+ asyncMaxPendingBacklogs = this.conf.getInt(
+ CapacitySchedulerConfiguration.
+ SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS,
+ CapacitySchedulerConfiguration.
+ DEFAULT_SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS);
+ }
+ }
+
+ private void initMultiNodePlacement() {
+ // Register CS specific multi-node policies to common MultiNodeManager
+ // which will add to a MultiNodeSorter which gives a pre-sorted list of
+ // nodes to scheduler's allocation.
+ multiNodePlacementEnabled = this.conf.getMultiNodePlacementEnabled();
+ if (rmContext.getMultiNodeSortingManager() != null) {
+ rmContext.getMultiNodeSortingManager().registerMultiNodePolicyNames(
+ multiNodePlacementEnabled,
+ this.conf.getMultiNodePlacementPolicies());
+ }
+ }
+
+ private void printSchedulerInitialized() {
+ LOG.info("Initialized CapacityScheduler with calculator={}, minimumAllocation={}, " +
+ "maximumAllocation={}, asynchronousScheduling={}, asyncScheduleInterval={} ms, " +
+ "multiNodePlacementEnabled={}, assignMultipleEnabled={}, maxAssignPerHeartbeat={}, " +
+ "offswitchPerHeartbeatLimit={}",
+ getResourceCalculator().getClass(),
+ getMinimumResourceCapability(),
+ getMaximumResourceCapability(),
+ scheduleAsynchronously,
+ asyncScheduleInterval,
+ multiNodePlacementEnabled,
+ assignMultipleEnabled,
+ maxAssignPerHeartbeat,
+ offswitchPerHeartbeatLimit);
+ }
+
private void startSchedulerThreads() {
writeLock.lock();
try {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org