You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2021/10/27 15:14:27 UTC

[hadoop] branch trunk updated: YARN-10924. Clean up CapacityScheduler#initScheduler (#3581) Contributed by Szilard Nemeth

This is an automated email from the ASF dual-hosted git repository.

shuzirra pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 66ac476  YARN-10924. Clean up CapacityScheduler#initScheduler (#3581) Contributed by Szilard Nemeth
66ac476 is described below

commit 66ac476b4823df351cbaab0971da0adc7d08c242
Author: Szilard Nemeth <95...@users.noreply.github.com>
AuthorDate: Wed Oct 27 17:13:49 2021 +0200

    YARN-10924. Clean up CapacityScheduler#initScheduler (#3581) Contributed by Szilard Nemeth
---
 .../scheduler/capacity/CapacityScheduler.java      | 165 +++++++++++----------
 1 file changed, 89 insertions(+), 76 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 592cb5a..b8091c7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -302,38 +302,13 @@ public class CapacityScheduler extends
       IOException, YarnException {
     writeLock.lock();
     try {
-      String confProviderStr = configuration.get(
-          YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,
-          YarnConfiguration.DEFAULT_CONFIGURATION_STORE);
-      switch (confProviderStr) {
-      case YarnConfiguration.FILE_CONFIGURATION_STORE:
-        this.csConfProvider =
-            new FileBasedCSConfigurationProvider(rmContext);
-        break;
-      case YarnConfiguration.MEMORY_CONFIGURATION_STORE:
-      case YarnConfiguration.LEVELDB_CONFIGURATION_STORE:
-      case YarnConfiguration.ZK_CONFIGURATION_STORE:
-      case YarnConfiguration.FS_CONFIGURATION_STORE:
-        this.csConfProvider = new MutableCSConfigurationProvider(rmContext);
-        break;
-      default:
-        throw new IOException("Invalid configuration store class: " +
-            confProviderStr);
-      }
+      this.csConfProvider = getCsConfProvider(configuration);
       this.csConfProvider.init(configuration);
       this.conf = this.csConfProvider.loadConfiguration(configuration);
       validateConf(this.conf);
       this.minimumAllocation = super.getMinimumAllocation();
       initMaximumResourceCapability(super.getMaximumAllocation());
-      this.calculator = this.conf.getResourceCalculator();
-      if (this.calculator instanceof DefaultResourceCalculator
-          && ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
-        throw new YarnRuntimeException("RM uses DefaultResourceCalculator which"
-            + " used only memory as resource-type but invalid resource-types"
-            + " specified " + ResourceUtils.getResourceTypes() + ". Use"
-            + " DominantResourceCalculator instead to make effective use of"
-            + " these resource-types");
-      }
+      this.calculator = initResourceCalculator();
       this.usePortForNodeName = this.conf.getUsePortForNodeName();
       this.applications = new ConcurrentHashMap<>();
       this.labelManager = rmContext.getNodeLabelManager();
@@ -341,71 +316,109 @@ public class CapacityScheduler extends
       this.queueManager = new CapacitySchedulerQueueManager(yarnConf,
           this.labelManager, this.appPriorityACLManager);
       this.queueManager.setCapacitySchedulerContext(this);
-
       this.workflowPriorityMappingsMgr = new WorkflowPriorityMappingsManager();
-
       this.activitiesManager = new ActivitiesManager(rmContext);
       activitiesManager.init(conf);
       initializeQueues(this.conf);
       this.isLazyPreemptionEnabled = conf.getLazyPreemptionEnabled();
-
-      scheduleAsynchronously = this.conf.getScheduleAynschronously();
-      asyncScheduleInterval = this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,
-          DEFAULT_ASYNC_SCHEDULER_INTERVAL);
-
       this.assignMultipleEnabled = this.conf.getAssignMultipleEnabled();
       this.maxAssignPerHeartbeat = this.conf.getMaxAssignPerHeartbeat();
-
-      this.appShouldFailFast = CapacitySchedulerConfiguration.shouldAppFailFast(
-          getConfig());
-
-      // number of threads for async scheduling
-      int maxAsyncSchedulingThreads = this.conf.getInt(
-          CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
-          1);
-      maxAsyncSchedulingThreads = Math.max(maxAsyncSchedulingThreads, 1);
-
-      if (scheduleAsynchronously) {
-        asyncSchedulerThreads = new ArrayList<>();
-        for (int i = 0; i < maxAsyncSchedulingThreads; i++) {
-          asyncSchedulerThreads.add(new AsyncScheduleThread(this));
-        }
-        resourceCommitterService = new ResourceCommitterService(this);
-        asyncMaxPendingBacklogs = this.conf.getInt(
-            CapacitySchedulerConfiguration.
-                SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS,
-            CapacitySchedulerConfiguration.
-                DEFAULT_SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS);
-      }
+      this.appShouldFailFast = CapacitySchedulerConfiguration.shouldAppFailFast(getConfig());
+      initAsyncSchedulingProperties();
 
       // Setup how many containers we can allocate for each round
       offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit();
 
-      // Register CS specific multi-node policies to common MultiNodeManager
-      // which will add to a MultiNodeSorter which gives a pre-sorted list of
-      // nodes to scheduler's allocation.
-      multiNodePlacementEnabled = this.conf.getMultiNodePlacementEnabled();
-      if(rmContext.getMultiNodeSortingManager() != null) {
-        rmContext.getMultiNodeSortingManager().registerMultiNodePolicyNames(
-            multiNodePlacementEnabled,
-            this.conf.getMultiNodePlacementPolicies());
-      }
-
-      LOG.info("Initialized CapacityScheduler with " + "calculator="
-          + getResourceCalculator().getClass() + ", " + "minimumAllocation="
-          + getMinimumResourceCapability() + ", " + "maximumAllocation="
-          + getMaximumResourceCapability() + ", " + "asynchronousScheduling="
-          + scheduleAsynchronously + ", " + "asyncScheduleInterval="
-          + asyncScheduleInterval + "ms" + ",multiNodePlacementEnabled="
-          + multiNodePlacementEnabled + ", " + "assignMultipleEnabled="
-          + assignMultipleEnabled + ", " + "maxAssignPerHeartbeat="
-          + maxAssignPerHeartbeat + ", " + "offswitchPerHeartbeatLimit="
-          + offswitchPerHeartbeatLimit);
+      initMultiNodePlacement();
+      printSchedulerInitialized();
     } finally {
       writeLock.unlock();
     }
   }
 
+  private CSConfigurationProvider getCsConfProvider(Configuration configuration)
+      throws IOException {
+    String confProviderStr = configuration.get(
+        YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,
+        YarnConfiguration.DEFAULT_CONFIGURATION_STORE);
+    switch (confProviderStr) {
+    case YarnConfiguration.FILE_CONFIGURATION_STORE:
+      return new FileBasedCSConfigurationProvider(rmContext);
+    case YarnConfiguration.MEMORY_CONFIGURATION_STORE:
+    case YarnConfiguration.LEVELDB_CONFIGURATION_STORE:
+    case YarnConfiguration.ZK_CONFIGURATION_STORE:
+    case YarnConfiguration.FS_CONFIGURATION_STORE:
+      return new MutableCSConfigurationProvider(rmContext);
+    default:
+      throw new IOException("Invalid configuration store class: " + confProviderStr);
+    }
+  }
+
+  private ResourceCalculator initResourceCalculator() {
+    ResourceCalculator resourceCalculator = this.conf.getResourceCalculator();
+    if (resourceCalculator instanceof DefaultResourceCalculator
+        && ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
+      throw new YarnRuntimeException("RM uses DefaultResourceCalculator which"
+          + " used only memory as resource-type but invalid resource-types"
+          + " specified " + ResourceUtils.getResourceTypes() + ". Use"
+          + " DominantResourceCalculator instead to make effective use of"
+          + " these resource-types");
+    }
+    return resourceCalculator;
+  }
+
+  private void initAsyncSchedulingProperties() {
+    scheduleAsynchronously = this.conf.getScheduleAynschronously();
+    asyncScheduleInterval = this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,
+        DEFAULT_ASYNC_SCHEDULER_INTERVAL);
+
+    // number of threads for async scheduling
+    int maxAsyncSchedulingThreads = this.conf.getInt(
+        CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD, 1);
+    maxAsyncSchedulingThreads = Math.max(maxAsyncSchedulingThreads, 1);
+
+    if (scheduleAsynchronously) {
+      asyncSchedulerThreads = new ArrayList<>();
+      for (int i = 0; i < maxAsyncSchedulingThreads; i++) {
+        asyncSchedulerThreads.add(new AsyncScheduleThread(this));
+      }
+      resourceCommitterService = new ResourceCommitterService(this);
+      asyncMaxPendingBacklogs = this.conf.getInt(
+          CapacitySchedulerConfiguration.
+              SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS,
+          CapacitySchedulerConfiguration.
+              DEFAULT_SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS);
+    }
+  }
+
+  private void initMultiNodePlacement() {
+    // Register CS specific multi-node policies to common MultiNodeManager
+    // which will add to a MultiNodeSorter which gives a pre-sorted list of
+    // nodes to scheduler's allocation.
+    multiNodePlacementEnabled = this.conf.getMultiNodePlacementEnabled();
+    if (rmContext.getMultiNodeSortingManager() != null) {
+      rmContext.getMultiNodeSortingManager().registerMultiNodePolicyNames(
+          multiNodePlacementEnabled,
+          this.conf.getMultiNodePlacementPolicies());
+    }
+  }
+
+  private void printSchedulerInitialized() {
+    LOG.info("Initialized CapacityScheduler with calculator={}, minimumAllocation={}, " +
+            "maximumAllocation={}, asynchronousScheduling={}, asyncScheduleInterval={} ms, " +
+            "multiNodePlacementEnabled={}, assignMultipleEnabled={}, maxAssignPerHeartbeat={}, " +
+            "offswitchPerHeartbeatLimit={}",
+        getResourceCalculator().getClass(),
+        getMinimumResourceCapability(),
+        getMaximumResourceCapability(),
+        scheduleAsynchronously,
+        asyncScheduleInterval,
+        multiNodePlacementEnabled,
+        assignMultipleEnabled,
+        maxAssignPerHeartbeat,
+        offswitchPerHeartbeatLimit);
+  }
+
   private void startSchedulerThreads() {
     writeLock.lock();
     try {

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org