You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/09/06 07:50:16 UTC

[GitHub] [spark] Ngone51 commented on a diff in pull request #37268: [SPARK-39853][CORE] Support stage level task resource profile for standalone cluster when dynamic allocation disabled

Ngone51 commented on code in PR #37268:
URL: https://github.com/apache/spark/pull/37268#discussion_r963292896


##########
core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala:
##########
@@ -253,6 +263,38 @@ class ResourceProfile(
   }
 }
 
+/**
+ * Resource profile which only contains task resources, can be used for stage level task schedule
+ * when dynamic allocation is disabled, tasks will be scheduled to executors with default resource
+ * profile based on task resources described by this task resource profile.
+ * And when dynamic allocation is enabled, will require new executors for this profile base on
+ * default build-in executor resources and assign tasks by resource profile id.
+ *
+ * @param taskResources Resource requests for tasks. Mapped from the resource
+ *                      name (e.g., cores, memory, CPU) to its specific request.
+ */
+@Evolving
+@Since("3.4.0")
+class TaskResourceProfile (

Review Comment:
   Why not make it a private class first?



##########
core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala:
##########
@@ -253,6 +263,38 @@ class ResourceProfile(
   }
 }
 
+/**
+ * Resource profile which only contains task resources, can be used for stage level task schedule
+ * when dynamic allocation is disabled, tasks will be scheduled to executors with default resource
+ * profile based on task resources described by this task resource profile.
+ * And when dynamic allocation is enabled, will require new executors for this profile base on
+ * default build-in executor resources and assign tasks by resource profile id.
+ *
+ * @param taskResources Resource requests for tasks. Mapped from the resource
+ *                      name (e.g., cores, memory, CPU) to its specific request.
+ */
+@Evolving
+@Since("3.4.0")
+class TaskResourceProfile (
+    override val taskResources: Map[String, TaskResourceRequest])
+  extends ResourceProfile(Map.empty, taskResources) {
+
+  override protected[spark] def getCustomExecutorResources():
+      Map[String, ExecutorResourceRequest] = {
+    if (SparkEnv.get == null) {
+      throw new IllegalStateException("SparkEnv should not be empty.")
+    }
+    val sparkConf = SparkEnv.get.conf
+
+    if (!Utils.isDynamicAllocationEnabled(sparkConf)) {
+      ResourceProfile.getOrCreateDefaultProfile(sparkConf)
+        .getCustomExecutorResources()
+    } else {
+      super.getCustomExecutorResources()

Review Comment:
   `super.getCustomExecutorResources()` returns empty?



##########
core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala:
##########
@@ -59,35 +59,64 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf,
   private val testExceptionThrown = sparkConf.get(RESOURCE_PROFILE_MANAGER_TESTING)
 
   /**
-   * If we use anything except the default profile, it's only supported on YARN and Kubernetes
-   * with dynamic allocation enabled. Throw an exception if not supported.
+   * If we use anything except the default profile, it's supported on YARN, Kubernetes and
+   * Standalone with dynamic allocation enabled, and task resource profile with dynamic allocation
+   * disabled on Standalone. Throw an exception if not supported.
    */
   private[spark] def isSupported(rp: ResourceProfile): Boolean = {
-    val isNotDefaultProfile = rp.id != ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
-    val notYarnOrK8sOrStandaloneAndNotDefaultProfile =
-      isNotDefaultProfile && !(isYarn || isK8s || isStandalone)
-    val YarnOrK8sOrStandaloneNotDynAllocAndNotDefaultProfile =
-      isNotDefaultProfile && (isYarn || isK8s || isStandalone) && !dynamicEnabled
-    // We want the exception to be thrown only when we are specifically testing for the
-    // exception or in a real application. Otherwise in all other testing scenarios we want
-    // to skip throwing the exception so that we can test in other modes to make testing easier.
-    if ((notRunningUnitTests || testExceptionThrown) &&
+    if (rp.isInstanceOf[TaskResourceProfile] && !dynamicEnabled) {
+      if ((notRunningUnitTests || testExceptionThrown) && !isStandalone) {
+        throw new SparkException("TaskResourceProfiles are only supported for Standalone " +
+          "cluster for now when dynamic allocation is disabled.")
+      }
+    } else {
+      val isNotDefaultProfile = rp.id != ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
+      val notYarnOrK8sOrStandaloneAndNotDefaultProfile =
+        isNotDefaultProfile && !(isYarn || isK8s || isStandalone)
+      val YarnOrK8sOrStandaloneNotDynAllocAndNotDefaultProfile =
+        isNotDefaultProfile && (isYarn || isK8s || isStandalone) && !dynamicEnabled
+
+      // We want the exception to be thrown only when we are specifically testing for the
+      // exception or in a real application. Otherwise in all other testing scenarios we want
+      // to skip throwing the exception so that we can test in other modes to make testing easier.
+      if ((notRunningUnitTests || testExceptionThrown) &&
         (notYarnOrK8sOrStandaloneAndNotDefaultProfile ||
           YarnOrK8sOrStandaloneNotDynAllocAndNotDefaultProfile)) {
-      throw new SparkException("ResourceProfiles are only supported on YARN and Kubernetes " +
-        "and Standalone with dynamic allocation enabled.")
-    }
+        throw new SparkException("ResourceProfiles are only supported on YARN and Kubernetes " +
+          "and Standalone with dynamic allocation enabled.")
+      }
 
-    if (isStandalone && rp.getExecutorCores.isEmpty &&
-      sparkConf.getOption(config.EXECUTOR_CORES.key).isEmpty) {
-      logWarning("Neither executor cores is set for resource profile, nor spark.executor.cores " +
-        "is explicitly set, you may get more executors allocated than expected. It's recommended " +
-        "to set executor cores explicitly. Please check SPARK-30299 for more details.")
+      if (isStandalone && dynamicEnabled && rp.getExecutorCores.isEmpty &&
+        sparkConf.getOption(config.EXECUTOR_CORES.key).isEmpty) {
+        logWarning("Neither executor cores is set for resource profile, nor spark.executor.cores " +
+          "is explicitly set, you may get more executors allocated than expected. " +
+          "It's recommended to set executor cores explicitly. " +
+          "Please check SPARK-30299 for more details.")
+      }
     }
 
     true
   }
 
+  /**
+   * Check whether a task with specific taskRpId can be scheduled to executors
+   * with executorRpId.
+   *
+   * Here are the rules:
+   * 1. Tasks with [[TaskResourceProfile]] can be scheduled to executors with
+   *    default resource profile when dynamic allocation is disabled.
+   * 2. Other tasks can be scheduled to executors where resource profile exactly matches.
+   */
+  private[spark] def canBeScheduled(taskRpId: Int, executorRpId: Int): Boolean = {
+    assert(resourceProfileIdToResourceProfile.contains(taskRpId) &&
+      resourceProfileIdToResourceProfile.contains(executorRpId),
+      "Tasks and executors must have valid resource profile id")
+    val taskRp = resourceProfileFromId(taskRpId)
+
+    taskRpId == executorRpId || (!dynamicEnabled && taskRp.isInstanceOf[TaskResourceProfile] &&
+      executorRpId == ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)

Review Comment:
   It seems like when creating `TaskResourceProfile` (either by `ResourceProfileBuilder.build()` or manually creating one), `TaskResourceProfile` always has a unique rpId. However, I thought you'd expect to have the constant rpId (i.e., ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) for `TaskResourceProfile` in the case of dynamic disabled.



##########
core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala:
##########
@@ -253,6 +263,38 @@ class ResourceProfile(
   }
 }
 
+/**
+ * Resource profile which only contains task resources, can be used for stage level task schedule
+ * when dynamic allocation is disabled, tasks will be scheduled to executors with default resource
+ * profile based on task resources described by this task resource profile.
+ * And when dynamic allocation is enabled, will require new executors for this profile base on
+ * default build-in executor resources and assign tasks by resource profile id.
+ *
+ * @param taskResources Resource requests for tasks. Mapped from the resource
+ *                      name (e.g., cores, memory, CPU) to its specific request.
+ */
+@Evolving
+@Since("3.4.0")
+class TaskResourceProfile (
+    override val taskResources: Map[String, TaskResourceRequest])
+  extends ResourceProfile(Map.empty, taskResources) {
+
+  override protected[spark] def getCustomExecutorResources():
+      Map[String, ExecutorResourceRequest] = {

Review Comment:
   nit:
   ```suggestion
     override protected[spark] def getCustomExecutorResources()
         :Map[String, ExecutorResourceRequest] = {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org