You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/02/18 02:08:59 UTC

[GitHub] [spark] HyukjinKwon commented on a change in pull request #27313: [SPARK-29148][CORE] Add stage level scheduling dynamic allocation and scheduler backend changes

HyukjinKwon commented on a change in pull request #27313: [SPARK-29148][CORE] Add stage level scheduling dynamic allocation and scheduler backend changes
URL: https://github.com/apache/spark/pull/27313#discussion_r380423718
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
 ##########
 @@ -67,6 +76,138 @@ class ResourceProfile(
     taskResources.get(ResourceProfile.CPUS).map(_.amount.toInt)
   }
 
+  private[spark] def getNumSlotsPerAddress(resource: String, sparkConf: SparkConf): Int = {
+    _executorResourceSlotsPerAddr.getOrElse {
+      calculateTasksAndLimitingResource(sparkConf)
+    }
+    _executorResourceSlotsPerAddr.get.getOrElse(resource,
+      throw new SparkException(s"Resource $resource doesn't exist in profile id: $id"))
+  }
+
+  // Maximum tasks you could put on an executor with this profile based on the limiting resource.
+  // If the executor cores config is not present this value is based on the other resources
+  // available or 1 if no other resources. You need to check the isCoresLimitKnown to
+  // calculate proper value.
+  private[spark] def maxTasksPerExecutor(sparkConf: SparkConf): Int = {
+    _maxTasksPerExecutor.getOrElse {
+      calculateTasksAndLimitingResource(sparkConf)
+      _maxTasksPerExecutor.get
+    }
+  }
+
+  // Returns whether the executor cores was available to use to calculate the max tasks
+  // per executor and limiting resource. Some cluster managers (like standalone and coarse
+  // grained mesos) don't use the cores config by default so we can't use it to calculate slots.
+  private[spark] def isCoresLimitKnown: Boolean = _coresLimitKnown
+
+  // The resource that has the least amount of slots per executor. Its possible multiple or all
+  // resources result in same number of slots and this could be any of those.
+  // If the executor cores config is not present this value is based on the other resources
+  // available or empty string if no other resources. You need to check the isCoresLimitKnown to
+  // calculate proper value.
+  private[spark] def limitingResource(sparkConf: SparkConf): String = {
+    _limitingResource.getOrElse {
+      calculateTasksAndLimitingResource(sparkConf)
+      _limitingResource.get
+    }
+  }
+
+  // executor cores config is not set for some masters by default and the default value
+  // only applies to yarn/k8s
+  private def shouldCheckExecutorCores(sparkConf: SparkConf): Boolean = {
+    val master = sparkConf.getOption("spark.master")
+    sparkConf.contains(EXECUTOR_CORES) ||
+      (master.isDefined && (master.get.equalsIgnoreCase("yarn") || master.get.startsWith("k8s")))
+  }
+
+  /**
+   * Utility function to calculate the number of tasks you can run on a single Executor based
+   * on the task and executor resource requests in the ResourceProfile. This will be based
+   * off the resource that is most restrictive. For instance, if the executor
+   * request is for 4 cpus and 2 gpus and your task request is for 1 cpu and 1 gpu each, the
+   * limiting resource is gpu and the number of tasks you can run on a single executor is 2.
+   * This function also sets the limiting resource, isCoresLimitKnown and number of slots per
+   * resource address.
+   */
+  private def calculateTasksAndLimitingResource(sparkConf: SparkConf): Unit = synchronized {
+    val shouldCheckExecCores = shouldCheckExecutorCores(sparkConf)
+    var (taskLimit, limitingResource) = if (shouldCheckExecCores) {
+      val cpusPerTask = taskResources.get(ResourceProfile.CPUS)
+        .map(_.amount).getOrElse(sparkConf.get(CPUS_PER_TASK).toDouble).toInt
+      assert(cpusPerTask > 0, "CPUs per task configuration has to be > 0")
+      val coresPerExecutor = getExecutorCores.getOrElse(sparkConf.get(EXECUTOR_CORES))
+      _coresLimitKnown = true
+      ResourceUtils.validateTaskCpusLargeEnough(coresPerExecutor, cpusPerTask)
+      val tasksBasedOnCores = coresPerExecutor / cpusPerTask
+      // Note that if the cores per executor aren't set properly this calculation could be off,
+      // we default it to just be 1 in order to allow checking of the rest of the custom
+      // resources. We set the limit based on the other resources available.
+      (tasksBasedOnCores, ResourceProfile.CPUS)
+    } else {
+      (-1, "")
+    }
+    val numPartsPerResourceMap = new mutable.HashMap[String, Int]
+    numPartsPerResourceMap(ResourceProfile.CORES) = 1
+    val taskResourcesToCheck = new mutable.HashMap[String, TaskResourceRequest]
+    taskResourcesToCheck ++= ResourceProfile.getCustomTaskResources(this)
+    val execResourceToCheck = ResourceProfile.getCustomExecutorResources(this)
+    execResourceToCheck.foreach { case (rName, execReq) =>
+      val taskReq = taskResources.get(rName).map(_.amount).getOrElse(0.0)
+      numPartsPerResourceMap(rName) = 1
+      if (taskReq > 0.0) {
+        if (taskReq > execReq.amount) {
+          throw new SparkException(s"The executor resource: $rName, amount: ${execReq.amount} " +
+            s"needs to be >= the task resource request amount of $taskReq")
+        }
+        val (numPerTask, parts) = ResourceUtils.calculateAmountAndPartsForFraction(taskReq)
+        numPartsPerResourceMap(rName) = parts
+        val numTasks = ((execReq.amount * parts) / numPerTask).toInt
+        if (taskLimit == -1 || numTasks < taskLimit) {
+          if (shouldCheckExecCores) {
+            // TODO - until resource profiles full implemented we need to error if cores not
+            // limiting resource because the scheduler code uses that for slots
+            throw new IllegalArgumentException("The number of slots on an executor has to be " +
+              "limited by the number of cores, otherwise you waste resources and " +
+              "dynamic allocation doesn't work properly. Your configuration has " +
+              s"core/task cpu slots = ${taskLimit} and " +
+              s"${execReq.resourceName} = ${numTasks}. " +
+              "Please adjust your configuration so that all resources require same number " +
+              "of executor slots.")
+          }
+          limitingResource = rName
+          taskLimit = numTasks
+        }
+        taskResourcesToCheck -= rName
+      } else {
+        logWarning(s"The executor resource config for resource: $rName was specified but " +
+          "no corresponding task resource request was specified.")
+      }
+    }
+    if(!shouldCheckExecCores) {
+      // if we can't rely on the executor cores config throw a warning for user
+      logWarning("Please ensure that the number of slots available on your " +
 
 Review comment:
   Hey @tgravescs, I was investigating sudden warning message popped up in my local suddenly, presumably, after this fix. My Spark shell shows this warnings consistently in my local:
   
   ```
   $ ./bin/spark-shell
   ```
   ```
   ...
   20/02/18 11:04:56 WARN ResourceProfile: Please ensure that the number of slots available on your executors is 
   limited by the number of cores to task cpus and not another custom resource. If cores is not the limiting resource 
   then dynamic allocation will not work properly!
   ```
   
   Do you have any idea?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org