You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/05/29 14:08:10 UTC

[GitHub] [spark] ivoson opened a new pull request, #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

ivoson opened a new pull request, #36716:
URL: https://github.com/apache/spark/pull/36716

   ### What changes were proposed in this pull request?
   Add support of stage level scheduling for standalone cluster. The feature is enabled when dynamic allocation enabled in standalone cluster.
   - `ResourceProfile` support creating default resource profile for standalone cluster, enable `None` value for executor cores which would take all available cores from a worker.
   - Enable the feature for standalone cluster in `ResourceProfileManager`.
   - `ApplicationDescription` take default resource profile as resource requirement description for an app.
   - Change standalone/master interface `RequestExecutors` to support request executors with multiple resource profiles.
   - `ApplicationInfo` to maintain the resource profiles and executors information, and help to make the schedule decision.
   - `Master` will schedule executors for one app based on resource profiles in a FIFO mode, a smaller resource profiles id will be scheduled firstly.
   - Add argument `resource profile id` while starting executors.
   
   ### Why are the changes needed?
   Stage level schedule already supported for yarn cluster and k8s, add support for standalone cluster.
   
   ### Does this PR introduce _any_ user-facing change?
   Since we modified the metadata of apps in master and also standalone client/master interface, so the new cluster can not work with an elder version of spark client.
   
   ### How was this patch tested?
   UT and manually test in a local standalone cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
ivoson commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r885759220


##########
core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala:
##########
@@ -43,8 +43,8 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
       return UIUtils.basicSparkPage(request, msg, "Not Found")
     }
 
-    val executorHeaders = Seq("ExecutorID", "Worker", "Cores", "Memory", "Resources",
-      "State", "Logs")
+    val executorHeaders = Seq("ExecutorID", "Worker", "Resource Profile Id", "Cores", "Memory",
+      "Resources", "State", "Logs")

Review Comment:
   Sure, of course. Will make the change. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
ivoson commented on PR #36716:
URL: https://github.com/apache/spark/pull/36716#issuecomment-1154747061

   > I kind of disagree because it doesn't work as expected compared to other resource manager. This to me is very confusing. I kind of hate to add more features on what I would consider a broken feature and then people think its ok. If people are using this successfully and finding it useful, the very least we need to do if document how it works and its limitations. I somewhat remember hitting other issues with dynamic allocation in standalone but then giving up on it as I talked to a few people that said it wasn't used and figured someone would go through and test it. ie on that same issue I mention "Note that there are other places in the code that uses executor cores which could also be wrong in standalone mode. for instance PythonRunner is using it to split memory."
   > 
   > @ivoson I'm assuming you are using this successfully in production, how much testing have you done with dynamic allocation?
   
   Hi @tgravescs , thanks for pointing out the issue. Actually, we are not using this in production right now. Just find this feature very useful and we can leverage it, but it can't work with standalone cluster right now, so I want to support it.
   For your concern, I'd like to create another ticket to update the doc first to make people be aware of it.
   
   Thanks @tgravescs  and @Ngone51 for your guidance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
ivoson commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r904913141


##########
core/src/test/scala/org/apache/spark/LocalSparkContext.scala:
##########
@@ -31,6 +31,8 @@ trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self
 
   override def beforeAll(): Unit = {
     super.beforeAll()
+    // Clear default profile, just in case default profile is not cleaned in other suites.
+    ResourceProfile.clearDefaultProfile()

Review Comment:
   Will take a look.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
ivoson commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r902793990


##########
core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala:
##########
@@ -336,9 +340,23 @@ object ResourceProfile extends Logging {
 
   private def getDefaultExecutorResources(conf: SparkConf): Map[String, ExecutorResourceRequest] = {
     val ereqs = new ExecutorResourceRequests()
-    val cores = conf.get(EXECUTOR_CORES)
-    ereqs.cores(cores)
-    val memory = conf.get(EXECUTOR_MEMORY)
+
+    val isStandalone = conf.getOption("spark.master").exists(_.startsWith("spark://"))
+    val isLocalCluster = conf.getOption("spark.master").exists(_.startsWith("local-cluster"))
+    // By default, standalone executors take all available cores, do not have a specific value.
+    val cores = if (isStandalone || isLocalCluster) {
+      conf.getOption(EXECUTOR_CORES.key).map(_.toInt)
+    } else {
+      Some(conf.get(EXECUTOR_CORES))
+    }
+    cores.foreach(ereqs.cores)
+
+    // Setting all resources here, cluster managers will take the resources they respect.

Review Comment:
   Sorry for the confusion. I am trying to say that besides executor cores and memory, we may also set memory overhead /offheap memory in spark conf, and these resources will finally exist in default resource profile . But cluster managers like standalone only use executors cores and memory, it will ignore other built-in resources.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
ivoson commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r894409213


##########
core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala:
##########
@@ -530,6 +535,87 @@ class MasterSuite extends SparkFunSuite
     schedulingWithEverything(spreadOut = false)
   }
 
+  test("scheduling for app with multiple resource profiles") {
+    scheduleExecutorsForAppWithMultiRPs(withMaxCores = false)
+  }
+
+  test("scheduling for app with multiple resource profiles with max cores") {
+    scheduleExecutorsForAppWithMultiRPs(withMaxCores = true)
+  }
+
+  private def scheduleExecutorsForAppWithMultiRPs(withMaxCores: Boolean): Unit = {
+    val appInfo: ApplicationInfo = if (withMaxCores) {
+      makeAppInfo(
+      1024, maxCores = Some(30), initialExecutorLimit = Some(0))
+    } else {
+      makeAppInfo(
+        1024, maxCores = None, initialExecutorLimit = Some(0))
+    }
+
+    val master = makeAliveMaster()
+    val conf = new SparkConf()
+    val workers = (1 to 4).map { idx =>
+      val worker = new MockWorker(master.self, conf)
+      worker.rpcEnv.setupEndpoint(s"worker-$idx", worker)
+      val workerReg = RegisterWorker(
+        worker.id,
+        "localhost",
+        worker.self.address.port,
+        worker.self,
+        10,
+        4096,
+        "http://localhost:8080",
+        RpcAddress("localhost", 10000))
+      master.self.send(workerReg)
+      worker
+    }
+
+    // Register app and schedule.
+    master.registerApplication(appInfo)
+    startExecutorsOnWorkers(master)
+    assert(appInfo.executors.isEmpty)
+
+    // Request executors with multiple resource profile.
+    val rp1 = DeployTestUtils.createResourceProfile(Some(2048), Map.empty, Some(5))
+    val rp2 = DeployTestUtils.createResourceProfile(Some(2048), Map.empty, Some(10))

Review Comment:
   done



##########
core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala:
##########
@@ -530,6 +535,87 @@ class MasterSuite extends SparkFunSuite
     schedulingWithEverything(spreadOut = false)
   }
 
+  test("scheduling for app with multiple resource profiles") {
+    scheduleExecutorsForAppWithMultiRPs(withMaxCores = false)
+  }
+
+  test("scheduling for app with multiple resource profiles with max cores") {
+    scheduleExecutorsForAppWithMultiRPs(withMaxCores = true)
+  }
+
+  private def scheduleExecutorsForAppWithMultiRPs(withMaxCores: Boolean): Unit = {
+    val appInfo: ApplicationInfo = if (withMaxCores) {
+      makeAppInfo(
+      1024, maxCores = Some(30), initialExecutorLimit = Some(0))

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on PR #36716:
URL: https://github.com/apache/spark/pull/36716#issuecomment-1154714325

   @tgravescs Thanks for your feedback. I agree it's a kind of dirty way to build new features upon the features with known issues, which hides the issues deeply in further. I'm also +1 on the proposal to update docs and warn users in the case of None cores. For a long term, we could think of a better solution for it. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on PR #36716:
URL: https://github.com/apache/spark/pull/36716#issuecomment-1171106781

   @ivoson Could you fix the GA failure?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
tgravescs commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r902710949


##########
core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala:
##########
@@ -336,9 +340,23 @@ object ResourceProfile extends Logging {
 
   private def getDefaultExecutorResources(conf: SparkConf): Map[String, ExecutorResourceRequest] = {
     val ereqs = new ExecutorResourceRequests()
-    val cores = conf.get(EXECUTOR_CORES)
-    ereqs.cores(cores)
-    val memory = conf.get(EXECUTOR_MEMORY)
+
+    val isStandalone = conf.getOption("spark.master").exists(_.startsWith("spark://"))
+    val isLocalCluster = conf.getOption("spark.master").exists(_.startsWith("local-cluster"))
+    // By default, standalone executors take all available cores, do not have a specific value.
+    val cores = if (isStandalone || isLocalCluster) {
+      conf.getOption(EXECUTOR_CORES.key).map(_.toInt)
+    } else {
+      Some(conf.get(EXECUTOR_CORES))
+    }
+    cores.foreach(ereqs.cores)
+
+    // Setting all resources here, cluster managers will take the resources they respect.

Review Comment:
   I'm not sure what this comment means "cluster managers will take the resources they respect". could you clarify



##########
core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala:
##########
@@ -54,6 +54,7 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf,
   private val master = sparkConf.getOption("spark.master")
   private val isYarn = master.isDefined && master.get.equals("yarn")
   private val isK8s = master.isDefined && master.get.startsWith("k8s://")
+  private val isStandalone = master.isDefined && master.get.startsWith("spark://")

Review Comment:
   what about local cluster here?  it would be nice to update description if adding support for it as well. Otherwise remove code from ResourceProfile



##########
core/src/test/scala/org/apache/spark/LocalSparkContext.scala:
##########
@@ -31,6 +31,8 @@ trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self
 
   override def beforeAll(): Unit = {
     super.beforeAll()
+    // Clear default profile, just in case default profile is not cleaned in other suites.
+    ResourceProfile.clearDefaultProfile()

Review Comment:
   seems like this should be unnecessary, were you getting test issues with this?



##########
core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala:
##########
@@ -402,7 +420,7 @@ object ResourceProfile extends Logging {
   }
 
   private[spark] case class ExecutorResourcesOrDefaults(
-      cores: Int,
+      cores: Option[Int], // Can only be None for standalone cluster.

Review Comment:
   standalone and local-cluster?



##########
docs/spark-standalone.md:
##########
@@ -455,6 +455,14 @@ if the worker has enough cores and memory. Otherwise, each executor grabs all th
 on the worker by default, in which case only one executor per application may be launched on each
 worker during one single schedule iteration.
 
+# Stage Level Scheduling Overview
+
+Stage level scheduling is supported on Standalone when dynamic allocation is enabled. Currently, when Master allocates executors for one application, will schedule based on the order of the ResourceProfile ids for multiple ResourceProfiles. The ResourceProfile with smaller id will be scheduled firstly. Normally this won’t matter as Spark finishes one stage before starting another one, the only case this might have an affect is in a job server type scenario, so its something to keep in mind. For scheduling, we will only take executor memory and executor cores from built-in executor resources and all other custom resources from a ResourceProfile, other built-in executor resources such as offHeap and memoryOverhead won't take any effect. The base default profile will be created based on the spark configs when you submit an application. Executor memory and executor cores from the base default profile can be propagated to custom ResourceProfiles, but all other custom resources can not 
 be propagated.

Review Comment:
   "when the Master"
   "it will schedule"
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r886345400


##########
core/src/main/scala/org/apache/spark/deploy/master/ResourceDescription.scala:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import org.apache.spark.resource.ResourceRequirement
+
+/**
+ * Describe resource requests for different resource profiles. Used for executor schedule.
+ *
+ * @param coresPerExecutor cores for each executor.
+ * @param memoryMbPerExecutor memory for each executor.
+ * @param customResourcesPerExecutor custom resource requests for each executor.
+ */
+private[spark] case class ResourceDescription(

Review Comment:
   Had another look around `ExecutorResourcesOrDefaults`, it looks like it's a general abstraction of executor resources that is shared by various cluster managers. So I think it makes sense to extract a specific resource description separately for the Standalone itself. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r885313120


##########
core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala:
##########
@@ -65,7 +66,70 @@ private[spark] class ApplicationInfo(
     appSource = new ApplicationSource(this)
     nextExecutorId = 0
     removedExecutors = new ArrayBuffer[ExecutorDesc]
-    executorLimit = desc.initialExecutorLimit.getOrElse(Integer.MAX_VALUE)
+    val initialExecutorLimit = desc.initialExecutorLimit.getOrElse(Integer.MAX_VALUE)
+
+    rpIdToResourceProfile = new mutable.HashMap[Int, ResourceProfile]()
+    rpIdToResourceProfile(DEFAULT_RESOURCE_PROFILE_ID) = desc.defaultProfile
+    rpIdToResourceDesc = new mutable.HashMap[Int, ResourceDescription]()
+    createResourceDescForResourceProfile(desc.defaultProfile)
+
+    targetNumExecutorsPerResourceProfileId = new mutable.HashMap[Int, Int]()
+    targetNumExecutorsPerResourceProfileId(DEFAULT_RESOURCE_PROFILE_ID) = initialExecutorLimit

Review Comment:
   I think the original `executorLimit` limits the total executor number of the whole application. But now it looks like `initialExecutorLimit` only limits the executor number of the `DEFAULT_RESOURCE_PROFILE_ID` type. How about other resource profile types?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
ivoson commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r894409370


##########
core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala:
##########
@@ -299,9 +300,10 @@ private[spark] class StandaloneAppClient(
    *
    * @return whether the request is acknowledged.
    */
-  def requestTotalExecutors(requestedTotal: Int): Future[Boolean] = {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r890982027


##########
core/src/main/scala/org/apache/spark/deploy/master/Master.scala:
##########
@@ -725,26 +729,38 @@ private[deploy] class Master(
    */
   private def startExecutorsOnWorkers(): Unit = {
     // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
-    // in the queue, then the second app, etc.
+    // in the queue, then the second app, etc. And for each app, we will schedule base on
+    // resource profiles also with a simple FIFO scheduler, resource profile with smaller id
+    // first.

Review Comment:
   I'd suggest to schedule in the order of the resource profile reuqests instead of the smaller id first. In the case of the resource profile is resued for later on RDD computation, the samller id doesn't seem to has the priority over other resource profiles. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
ivoson commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r893573166


##########
core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala:
##########
@@ -530,6 +535,87 @@ class MasterSuite extends SparkFunSuite
     schedulingWithEverything(spreadOut = false)
   }
 
+  test("scheduling for app with multiple resource profiles") {
+    scheduleExecutorsForAppWithMultiRPs(withMaxCores = false)
+  }
+
+  test("scheduling for app with multiple resource profiles with max cores") {
+    scheduleExecutorsForAppWithMultiRPs(withMaxCores = true)
+  }
+
+  private def scheduleExecutorsForAppWithMultiRPs(withMaxCores: Boolean): Unit = {
+    val appInfo: ApplicationInfo = if (withMaxCores) {
+      makeAppInfo(
+      1024, maxCores = Some(30), initialExecutorLimit = Some(0))
+    } else {
+      makeAppInfo(
+        1024, maxCores = None, initialExecutorLimit = Some(0))
+    }
+
+    val master = makeAliveMaster()
+    val conf = new SparkConf()
+    val workers = (1 to 4).map { idx =>
+      val worker = new MockWorker(master.self, conf)
+      worker.rpcEnv.setupEndpoint(s"worker-$idx", worker)
+      val workerReg = RegisterWorker(
+        worker.id,
+        "localhost",
+        worker.self.address.port,
+        worker.self,
+        10,
+        4096,
+        "http://localhost:8080",
+        RpcAddress("localhost", 10000))
+      master.self.send(workerReg)
+      worker
+    }
+
+    // Register app and schedule.
+    master.registerApplication(appInfo)
+    startExecutorsOnWorkers(master)
+    assert(appInfo.executors.isEmpty)
+
+    // Request executors with multiple resource profile.
+    val rp1 = DeployTestUtils.createResourceProfile(Some(2048), Map.empty, Some(5))
+    val rp2 = DeployTestUtils.createResourceProfile(Some(2048), Map.empty, Some(10))

Review Comment:
   Sure, will add the test case. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
tgravescs commented on PR #36716:
URL: https://github.com/apache/spark/pull/36716#issuecomment-1154005738

   assuming we would allow this with dynamic allocation doc updates, it needs doc updates for stage level scheduling sections and be sure to talk about None cores option and perhaps how cores work in general.
   
   I'm assuming if a user uses the core option with the ResourceProfile they are going to get the same behavior as https://issues.apache.org/jira/browse/SPARK-30299  which means it doesn't really work, correct?  If the answer is the user has to specify EXECUTOR_CORES and then everything works fine again I guess I'm ok with it as long as its documented and it would be nice if we could catch this case and error or at least warn the user


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r885377381


##########
core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala:
##########
@@ -43,8 +43,8 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
       return UIUtils.basicSparkPage(request, msg, "Not Found")
     }
 
-    val executorHeaders = Seq("ExecutorID", "Worker", "Cores", "Memory", "Resources",
-      "State", "Logs")
+    val executorHeaders = Seq("ExecutorID", "Worker", "Resource Profile Id", "Cores", "Memory",
+      "Resources", "State", "Logs")

Review Comment:
   Could you leave the positions of "Cores" and "Memory" unchanged and put "Resource Profile Id" together with "Resources"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
ivoson commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r885758451


##########
core/src/main/scala/org/apache/spark/deploy/master/ResourceDescription.scala:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import org.apache.spark.resource.ResourceRequirement
+
+/**
+ * Describe resource requests for different resource profiles. Used for executor schedule.
+ *
+ * @param coresPerExecutor cores for each executor.
+ * @param memoryMbPerExecutor memory for each executor.
+ * @param customResourcesPerExecutor custom resource requests for each executor.
+ */
+private[spark] case class ResourceDescription(

Review Comment:
   Thanks for the advise. `ExecutorResourceDescription ` sounds better.
   
   The reason I introduce this class it that, standalone cluster schedule based on coresPerExecutor, memoryMbPerExecutor, and also customResources.
   The main difference is that `customResourcesPerExecutor` is described with type Seq of [ResourceRequirement](https://github.com/apache/spark/blob/v3.3.0-rc3/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala#L116) here which is more friendly for schedule in master.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r891129088


##########
core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala:
##########
@@ -107,11 +107,11 @@ object JsonConstants {
       |{"id":"id","starttime":3,"name":"name",
       |"cores":0,"user":"%s",
       |"memoryperexecutor":1234,
-      |"resourcesperexecutor":[{"name":"gpu",
-      |"amount":3},{"name":"fpga","amount":3}],
+      |"resourcesperexecutor":[{"name":"fpga",
+      |"amount":3},{"name":"gpu","amount":3}],

Review Comment:
   Why the order changes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
ivoson commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r894409767


##########
core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala:
##########
@@ -43,8 +43,8 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
       return UIUtils.basicSparkPage(request, msg, "Not Found")
     }
 
-    val executorHeaders = Seq("ExecutorID", "Worker", "Cores", "Memory", "Resources",
-      "State", "Logs")
+    val executorHeaders = Seq("ExecutorID", "Worker", "Resource Profile Id", "Cores", "Memory",
+      "Resources", "State", "Logs")

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
ivoson commented on PR #36716:
URL: https://github.com/apache/spark/pull/36716#issuecomment-1171268449

   > @ivoson Could you fix the GA failure?
   
   Thanks @Ngone51 , rebased master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on PR #36716:
URL: https://github.com/apache/spark/pull/36716#issuecomment-1169628455

   @tgravescs do you have more concerns?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
ivoson commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r902795080


##########
docs/job-scheduling.md:
##########
@@ -83,6 +83,10 @@ This feature is disabled by default and available on all coarse-grained cluster
 [Mesos coarse-grained mode](running-on-mesos.html#mesos-run-modes) and [K8s mode](running-on-kubernetes.html).
 
 
+### Caveats
+
+- In [standalone mode](spark-standalone.html), without explicitly setting cores for each executor, executors will get all the cores of a worker. In this case, when dynamic allocation enabled, spark will possibly acquire much more executors than expected. When you want to use dynamic allocation in [standalone mode](spark-standalone.html), you are recommended to explicitly set cores for each executor before the issue [SPARK-30299](https://issues.apache.org/jira/browse/SPARK-30299) got fixed.

Review Comment:
   > I didn't see where this case is warned in the code. Could you add it?
   
   Have added some warning logs for this. Would this be enough?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
ivoson commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r902806648


##########
docs/spark-standalone.md:
##########
@@ -455,6 +455,14 @@ if the worker has enough cores and memory. Otherwise, each executor grabs all th
 on the worker by default, in which case only one executor per application may be launched on each
 worker during one single schedule iteration.
 
+# Stage Level Scheduling Overview
+
+Stage level scheduling is supported on Standalone when dynamic allocation is enabled. Currently, when Master allocates executors for one application, will schedule based on the order of the ResourceProfile ids for multiple ResourceProfiles. The ResourceProfile with smaller id will be scheduled firstly. Normally this won’t matter as Spark finishes one stage before starting another one, the only case this might have an affect is in a job server type scenario, so its something to keep in mind. For scheduling, we will only take executor memory and executor cores from built-in executor resources and all other custom resources from a ResourceProfile, other built-in executor resources such as offHeap and memoryOverhead won't take any effect. The base default profile will be created based on the spark configs when you submit an application. Executor memory and executor cores from the base default profile can be propagated to custom ResourceProfiles, but all other custom resources can not 
 be propagated.

Review Comment:
   Thanks, will update the doc here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
tgravescs commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r903861092


##########
core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala:
##########
@@ -54,6 +54,7 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf,
   private val master = sparkConf.getOption("spark.master")
   private val isYarn = master.isDefined && master.get.equals("yarn")
   private val isK8s = master.isDefined && master.get.startsWith("k8s://")
+  private val isStandalone = master.isDefined && master.get.startsWith("spark://")

Review Comment:
   yes makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
ivoson commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r906771052


##########
core/src/test/scala/org/apache/spark/LocalSparkContext.scala:
##########
@@ -31,6 +31,8 @@ trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self
 
   override def beforeAll(): Unit = {
     super.beforeAll()
+    // Clear default profile, just in case default profile is not cleaned in other suites.
+    ResourceProfile.clearDefaultProfile()

Review Comment:
   Done. After cleaning up default resource profile in `KubernetesClusterSchedulerBackendSuite` and `KubernetesExecutorBuilderSuite`, all UTs passed. I removed the code here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
ivoson commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r893495301


##########
core/src/main/scala/org/apache/spark/deploy/ExecutorDescription.scala:
##########
@@ -25,10 +25,13 @@ package org.apache.spark.deploy
 private[deploy] class ExecutorDescription(
     val appId: String,
     val execId: Int,
+    val rpId: Int,
     val cores: Int,
+    val memoryMb: Int,

Review Comment:
   Yes, correct. Master won't allocate new executors before next resource requests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
ivoson commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r893516966


##########
core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala:
##########
@@ -65,7 +66,70 @@ private[spark] class ApplicationInfo(
     appSource = new ApplicationSource(this)
     nextExecutorId = 0
     removedExecutors = new ArrayBuffer[ExecutorDesc]
-    executorLimit = desc.initialExecutorLimit.getOrElse(Integer.MAX_VALUE)
+    val initialExecutorLimit = desc.initialExecutorLimit.getOrElse(Integer.MAX_VALUE)
+
+    rpIdToResourceProfile = new mutable.HashMap[Int, ResourceProfile]()
+    rpIdToResourceProfile(DEFAULT_RESOURCE_PROFILE_ID) = desc.defaultProfile
+    rpIdToResourceDesc = new mutable.HashMap[Int, ExecutorResourceDescription]()
+    createResourceDescForResourceProfile(desc.defaultProfile)
+
+    targetNumExecutorsPerResourceProfileId = new mutable.HashMap[Int, Int]()
+    targetNumExecutorsPerResourceProfileId(DEFAULT_RESOURCE_PROFILE_ID) = initialExecutorLimit
+
+    executorsPerResourceProfileId = new mutable.HashMap[Int, mutable.Set[Int]]()
+  }
+
+  private[deploy] def getOrUpdateExecutorsForRPId(rpId: Int): mutable.Set[Int] = {
+    executorsPerResourceProfileId.getOrElseUpdate(rpId, mutable.HashSet[Int]())
+  }
+
+  private[deploy] def getTargetExecutorNumForRPId(rpId: Int): Int = {
+    targetNumExecutorsPerResourceProfileId.getOrElse(rpId, 0)
+  }
+
+  private[deploy] def getRequestedRPIds(): Seq[Int] = {
+    rpIdToResourceProfile.keys.toSeq.sorted
+  }
+
+  private def createResourceDescForResourceProfile(resourceProfile: ResourceProfile): Unit = {
+    if (!rpIdToResourceDesc.contains(resourceProfile.id)) {
+      val defaultMemoryMbPerExecutor = desc.memoryPerExecutorMB
+      val defaultCoresPerExecutor = desc.coresPerExecutor
+      val coresPerExecutor = resourceProfile.getExecutorCores
+        .orElse(defaultCoresPerExecutor)
+      val memoryMbPerExecutor = resourceProfile.getExecutorMemory
+        .map(_.toInt)
+        .getOrElse(defaultMemoryMbPerExecutor)
+      val customResources = ResourceUtils.executorResourceRequestToRequirement(
+        getCustomExecutorResources(resourceProfile).values.toSeq)
+
+      rpIdToResourceDesc(resourceProfile.id) =
+        ExecutorResourceDescription(coresPerExecutor, memoryMbPerExecutor, customResources)
+    }
+  }
+
+  // Get resources required for schedule.
+  private[deploy] def getResourceDescriptionForRpId(rpId: Int): ExecutorResourceDescription = {
+    rpIdToResourceDesc(rpId)
+  }
+
+  private[deploy] def requestExecutors(
+      resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Unit = {
+    resourceProfileToTotalExecs.foreach { case (rp, num) =>
+      createResourceDescForResourceProfile(rp)
+
+      if (!rpIdToResourceProfile.contains(rp.id)) {
+        rpIdToResourceProfile(rp.id) = rp
+      }
+
+      if (!targetNumExecutorsPerResourceProfileId.get(rp.id).contains(num)) {
+        targetNumExecutorsPerResourceProfileId(rp.id) = num
+      }

Review Comment:
   Sure, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on PR #36716:
URL: https://github.com/apache/spark/pull/36716#issuecomment-1151073534

   > So last time I checked dynamic allocation in standalone mode had issues. Have this been addressed or do you not see those issues? one such issue was https://issues.apache.org/jira/browse/SPARK-30299?filter=-2 but I thought there was others.
   
   SPARK-30299 seems to be a long standing issue. For a standalone cluster with the default EXECUTOR_CORES, I think we have no idea to answer how many executors we needs but only how many cores we needs. So answering "how many cores we needs" may be a direction to fix the issue in this case but it'd of course require a lot of changes to support. 
   
   That said, I think Standalone can still work well with dynamic allocation as along as the EXECUTOR_CORES is explicitly configured. So I think we may not have to block on that issue. 
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r885557272


##########
core/src/main/scala/org/apache/spark/deploy/master/ResourceDescription.scala:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import org.apache.spark.resource.ResourceRequirement
+
+/**
+ * Describe resource requests for different resource profiles. Used for executor schedule.
+ *
+ * @param coresPerExecutor cores for each executor.
+ * @param memoryMbPerExecutor memory for each executor.
+ * @param customResourcesPerExecutor custom resource requests for each executor.
+ */
+private[spark] case class ResourceDescription(

Review Comment:
   Shall we reuse `ExecutorResourcesOrDefaults` to replace `ResourceDescription`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on PR #36716:
URL: https://github.com/apache/spark/pull/36716#issuecomment-1148335486

   cc @tgravescs for review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r890938856


##########
core/src/main/scala/org/apache/spark/deploy/ExecutorDescription.scala:
##########
@@ -25,10 +25,13 @@ package org.apache.spark.deploy
 private[deploy] class ExecutorDescription(
     val appId: String,
     val execId: Int,
+    val rpId: Int,
     val cores: Int,
+    val memoryMb: Int,

Review Comment:
   > And in master, we can only reconstruct the resource profile information in ApplicationInfo after client send resource request RequestExecutors
   
   Does it mean we can't launch new executors with the specific `rpId` until the client sends the request with the corresponding resource profile? For example, if the number of executos with the specific `rpId` hasn't reached the targer number, it seems like we can't schedule new executors for it until we know resource profile by `RequestExecutors`, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
ivoson commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r893565176


##########
core/src/main/scala/org/apache/spark/deploy/master/Master.scala:
##########
@@ -725,26 +729,38 @@ private[deploy] class Master(
    */
   private def startExecutorsOnWorkers(): Unit = {
     // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
-    // in the queue, then the second app, etc.
+    // in the queue, then the second app, etc. And for each app, we will schedule base on
+    // resource profiles also with a simple FIFO scheduler, resource profile with smaller id
+    // first.

Review Comment:
   Yes, you are right. Smaller id does not indicate higher priority directly. The strategy is simple and straightforward, relying on the dynamic allocation manager to adjust the target number for each resource profile, and we just fulfill the requirements in a fixed order.
   
   Currently, we don't have a good way to infer about the order of requests for different resource profiles. The information we have are the target number and existing executor number for each resource profile. Maybe we can compute the priority somehow in a FAIR schedule mode.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
tgravescs commented on PR #36716:
URL: https://github.com/apache/spark/pull/36716#issuecomment-1153964695

   I kind of disagree because it doesn't work as expected compared to other resource manager.   This to me is very confusing.  I kind of hate to add more features on what I would consider a broken feature and then people think its ok.  
   If people are using this successfully and finding it useful, the very least we need to do if document how it works and its limitations.
   I somewhat remember hitting other issues with dynamic allocation in standalone but then giving up on it as I talked to a few people that said it wasn't used and figured someone would go through and test it. 
   
   @ivoson I'm assuming you are using this successfully in production, how much testing have you done with dynamic allocation?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
ivoson commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r904917522


##########
core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala:
##########
@@ -336,9 +340,23 @@ object ResourceProfile extends Logging {
 
   private def getDefaultExecutorResources(conf: SparkConf): Map[String, ExecutorResourceRequest] = {
     val ereqs = new ExecutorResourceRequests()
-    val cores = conf.get(EXECUTOR_CORES)
-    ereqs.cores(cores)
-    val memory = conf.get(EXECUTOR_MEMORY)
+
+    val isStandalone = conf.getOption("spark.master").exists(_.startsWith("spark://"))
+    val isLocalCluster = conf.getOption("spark.master").exists(_.startsWith("local-cluster"))
+    // By default, standalone executors take all available cores, do not have a specific value.
+    val cores = if (isStandalone || isLocalCluster) {
+      conf.getOption(EXECUTOR_CORES.key).map(_.toInt)
+    } else {
+      Some(conf.get(EXECUTOR_CORES))
+    }
+    cores.foreach(ereqs.cores)
+
+    // Setting all resources here, cluster managers will take the resources they respect.

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
tgravescs commented on PR #36716:
URL: https://github.com/apache/spark/pull/36716#issuecomment-1170021683

   No lgtm 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 closed pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
Ngone51 closed pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster
URL: https://github.com/apache/spark/pull/36716


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on PR #36716:
URL: https://github.com/apache/spark/pull/36716#issuecomment-1174530107

   Thanks merged to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
ivoson commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r902839471


##########
core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala:
##########
@@ -402,7 +420,7 @@ object ResourceProfile extends Logging {
   }
 
   private[spark] case class ExecutorResourcesOrDefaults(
-      cores: Int,
+      cores: Option[Int], // Can only be None for standalone cluster.

Review Comment:
   Thanks, done.



##########
docs/spark-standalone.md:
##########
@@ -455,6 +455,14 @@ if the worker has enough cores and memory. Otherwise, each executor grabs all th
 on the worker by default, in which case only one executor per application may be launched on each
 worker during one single schedule iteration.
 
+# Stage Level Scheduling Overview
+
+Stage level scheduling is supported on Standalone when dynamic allocation is enabled. Currently, when Master allocates executors for one application, will schedule based on the order of the ResourceProfile ids for multiple ResourceProfiles. The ResourceProfile with smaller id will be scheduled firstly. Normally this won’t matter as Spark finishes one stage before starting another one, the only case this might have an affect is in a job server type scenario, so its something to keep in mind. For scheduling, we will only take executor memory and executor cores from built-in executor resources and all other custom resources from a ResourceProfile, other built-in executor resources such as offHeap and memoryOverhead won't take any effect. The base default profile will be created based on the spark configs when you submit an application. Executor memory and executor cores from the base default profile can be propagated to custom ResourceProfiles, but all other custom resources can not 
 be propagated.

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
ivoson commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r902835745


##########
core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala:
##########
@@ -402,7 +420,7 @@ object ResourceProfile extends Logging {
   }
 
   private[spark] case class ExecutorResourcesOrDefaults(
-      cores: Int,
+      cores: Option[Int], // Can only be None for standalone cluster.

Review Comment:
   I think this is for multiple resource profiles/stage level scheduling. Currently we didn't add the support for local-cluster.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
ivoson commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r904917849


##########
core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala:
##########
@@ -336,9 +340,23 @@ object ResourceProfile extends Logging {
 
   private def getDefaultExecutorResources(conf: SparkConf): Map[String, ExecutorResourceRequest] = {
     val ereqs = new ExecutorResourceRequests()
-    val cores = conf.get(EXECUTOR_CORES)
-    ereqs.cores(cores)
-    val memory = conf.get(EXECUTOR_MEMORY)
+
+    val isStandalone = conf.getOption("spark.master").exists(_.startsWith("spark://"))
+    val isLocalCluster = conf.getOption("spark.master").exists(_.startsWith("local-cluster"))

Review Comment:
   Thanks, done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
ivoson commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r885722489


##########
core/src/main/scala/org/apache/spark/deploy/ExecutorDescription.scala:
##########
@@ -25,10 +25,13 @@ package org.apache.spark.deploy
 private[deploy] class ExecutorDescription(
     val appId: String,
     val execId: Int,
+    val rpId: Int,
     val cores: Int,
+    val memoryMb: Int,

Review Comment:
   We will need the memoryMb here when [master failover.](https://github.com/apache/spark/blob/v3.3.0-rc3/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L351) Master will reconstruct the app's executor information and worker's resource usage information.
   
   ```
   for (exec <- validExecutors) {
       val (execDesc, execResources) = (exec.desc, exec.resources)
       val app = idToApp(execDesc.appId)
       val execInfo = app.addExecutor(
           worker, execDesc.cores, execResources, Some(execDesc.execId))
       worker.addExecutor(execInfo)
       worker.recoverResources(execResources)
       execInfo.copyState(execDesc)
   }
   ```
   
   We need to know the memoryMb of the executor while creating [`ExecutorDesc`](https://github.com/apache/spark/blob/v3.3.0-rc3/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala#L88). And previously, we can get the memoryMb information from `ApplicaionDescription` which could be recovered from state-store for recovery.
   But currently, we may have multiple resource profiles, and we don't have the resource profiles information written in the state-store, so we need worker to record the memory information.
   
   And in master, we can only reconstruct the resource profile information in `ApplicationInfo` after client send resource request `RequestExecutors`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
ivoson commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r894409531


##########
core/src/main/scala/org/apache/spark/deploy/master/ExecutorResourceDescription.scala:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import org.apache.spark.resource.ResourceRequirement
+
+/**
+ * Describe resource requests for different resource profiles. Used for executor schedule.
+ *
+ * @param coresPerExecutor cores for each executor.
+ * @param memoryMbPerExecutor memory for each executor.
+ * @param customResourcesPerExecutor custom resource requests for each executor.

Review Comment:
   done



##########
core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala:
##########
@@ -19,23 +19,28 @@ package org.apache.spark.deploy
 
 import java.net.URI
 
-import org.apache.spark.resource.ResourceRequirement
+import org.apache.spark.resource.{ResourceProfile, ResourceRequirement, ResourceUtils}
+import org.apache.spark.resource.ResourceProfile.getCustomExecutorResources
 
 private[spark] case class ApplicationDescription(
     name: String,
     maxCores: Option[Int],
-    memoryPerExecutorMB: Int,
     command: Command,
     appUiUrl: String,
+    defaultProfile: ResourceProfile,
     eventLogDir: Option[URI] = None,
     // short name of compression codec used when writing event logs, if any (e.g. lzf)
     eventLogCodec: Option[String] = None,
-    coresPerExecutor: Option[Int] = None,
     // number of executors this application wants to start with,
     // only used if dynamic allocation is enabled
     initialExecutorLimit: Option[Int] = None,
-    user: String = System.getProperty("user.name", "<unknown>"),
-    resourceReqsPerExecutor: Seq[ResourceRequirement] = Seq.empty) {
+    user: String = System.getProperty("user.name", "<unknown>")) {
+
+  def memoryPerExecutorMB: Int = defaultProfile.getExecutorMemory.map(_.toInt).getOrElse(1024)
+  def coresPerExecutor: Option[Int] = defaultProfile.getExecutorCores
+  def resourceReqsPerExecutor: Seq[ResourceRequirement] =
+    ResourceUtils.executorResourceRequestToRequirement(
+      getCustomExecutorResources(defaultProfile).values.toSeq.sortBy(_.resourceName))

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
ivoson commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r893511546


##########
core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala:
##########
@@ -19,23 +19,28 @@ package org.apache.spark.deploy
 
 import java.net.URI
 
-import org.apache.spark.resource.ResourceRequirement
+import org.apache.spark.resource.{ResourceProfile, ResourceRequirement, ResourceUtils}
+import org.apache.spark.resource.ResourceProfile.getCustomExecutorResources
 
 private[spark] case class ApplicationDescription(
     name: String,
     maxCores: Option[Int],
-    memoryPerExecutorMB: Int,
     command: Command,
     appUiUrl: String,
+    defaultProfile: ResourceProfile,
     eventLogDir: Option[URI] = None,
     // short name of compression codec used when writing event logs, if any (e.g. lzf)
     eventLogCodec: Option[String] = None,
-    coresPerExecutor: Option[Int] = None,
     // number of executors this application wants to start with,
     // only used if dynamic allocation is enabled
     initialExecutorLimit: Option[Int] = None,
-    user: String = System.getProperty("user.name", "<unknown>"),
-    resourceReqsPerExecutor: Seq[ResourceRequirement] = Seq.empty) {
+    user: String = System.getProperty("user.name", "<unknown>")) {
+
+  def memoryPerExecutorMB: Int = defaultProfile.getExecutorMemory.map(_.toInt).getOrElse(1024)
+  def coresPerExecutor: Option[Int] = defaultProfile.getExecutorCores
+  def resourceReqsPerExecutor: Seq[ResourceRequirement] =
+    ResourceUtils.executorResourceRequestToRequirement(
+      getCustomExecutorResources(defaultProfile).values.toSeq.sortBy(_.resourceName))

Review Comment:
   This is used in Web UI and also UT. The sort here just want to make sure we have a fixed and predictable order.
   
   And the order changes in the UT [JsonProtocolSuite](https://github.com/apache/spark/pull/36716/files/beae4f7a50e041a78dae5bb2814a37a0e039c848#diff-aa7bbf69f2e6987aa4a22eb55b12e024c60c2110bb81ad9a7fbce44000ac048f) caused by this change.
   
   Does this make sense? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r893599247


##########
core/src/main/scala/org/apache/spark/deploy/master/Master.scala:
##########
@@ -725,26 +729,38 @@ private[deploy] class Master(
    */
   private def startExecutorsOnWorkers(): Unit = {
     // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
-    // in the queue, then the second app, etc.
+    // in the queue, then the second app, etc. And for each app, we will schedule base on
+    // resource profiles also with a simple FIFO scheduler, resource profile with smaller id
+    // first.

Review Comment:
   > Currently, we don't have a good way to infer about the order of requests for different resource profiles.
   
   I actually means the order of receiving the request in Master, although I know it could be out of order compared to the request sender (driver) due to asynchronous RPC framework. But after a second thinking, requests come from the pending tasks, which are able to be scheduled in parallel as long as there're enough resources. So it doesn't really matter which resource profile should be used to launch executors. Schedule by ordered resource profile ids should be enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
ivoson commented on PR #36716:
URL: https://github.com/apache/spark/pull/36716#issuecomment-1157720575

   cc @Ngone51 @tgravescs docs updated, could you please take a look? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r893583053


##########
core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala:
##########
@@ -19,23 +19,28 @@ package org.apache.spark.deploy
 
 import java.net.URI
 
-import org.apache.spark.resource.ResourceRequirement
+import org.apache.spark.resource.{ResourceProfile, ResourceRequirement, ResourceUtils}
+import org.apache.spark.resource.ResourceProfile.getCustomExecutorResources
 
 private[spark] case class ApplicationDescription(
     name: String,
     maxCores: Option[Int],
-    memoryPerExecutorMB: Int,
     command: Command,
     appUiUrl: String,
+    defaultProfile: ResourceProfile,
     eventLogDir: Option[URI] = None,
     // short name of compression codec used when writing event logs, if any (e.g. lzf)
     eventLogCodec: Option[String] = None,
-    coresPerExecutor: Option[Int] = None,
     // number of executors this application wants to start with,
     // only used if dynamic allocation is enabled
     initialExecutorLimit: Option[Int] = None,
-    user: String = System.getProperty("user.name", "<unknown>"),
-    resourceReqsPerExecutor: Seq[ResourceRequirement] = Seq.empty) {
+    user: String = System.getProperty("user.name", "<unknown>")) {
+
+  def memoryPerExecutorMB: Int = defaultProfile.getExecutorMemory.map(_.toInt).getOrElse(1024)
+  def coresPerExecutor: Option[Int] = defaultProfile.getExecutorCores
+  def resourceReqsPerExecutor: Seq[ResourceRequirement] =
+    ResourceUtils.executorResourceRequestToRequirement(
+      getCustomExecutorResources(defaultProfile).values.toSeq.sortBy(_.resourceName))

Review Comment:
   Make sense. So let's make all of them be consistent? Also sort it in `ApplicationInfo`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
ivoson commented on PR #36716:
URL: https://github.com/apache/spark/pull/36716#issuecomment-1155198209

   > so I'm ok for documenting the dynamic allocation separately though should be a blocker for the same release, but really it overlaps with this because this uses dynamic allocation and this pr needs to update the docs for stage level scheduling, including any caveats.
   
   Thanks @tgravescs . Then I will update the docs for both standalone dynamic allocation and stage level scheduling in this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
tgravescs commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r903860857


##########
core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala:
##########
@@ -336,9 +340,23 @@ object ResourceProfile extends Logging {
 
   private def getDefaultExecutorResources(conf: SparkConf): Map[String, ExecutorResourceRequest] = {
     val ereqs = new ExecutorResourceRequests()
-    val cores = conf.get(EXECUTOR_CORES)
-    ereqs.cores(cores)
-    val memory = conf.get(EXECUTOR_MEMORY)
+
+    val isStandalone = conf.getOption("spark.master").exists(_.startsWith("spark://"))
+    val isLocalCluster = conf.getOption("spark.master").exists(_.startsWith("local-cluster"))

Review Comment:
   add comment about local cluster here.  like the one you replied to my comment below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
ivoson commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r904912500


##########
core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala:
##########
@@ -336,9 +340,23 @@ object ResourceProfile extends Logging {
 
   private def getDefaultExecutorResources(conf: SparkConf): Map[String, ExecutorResourceRequest] = {
     val ereqs = new ExecutorResourceRequests()
-    val cores = conf.get(EXECUTOR_CORES)
-    ereqs.cores(cores)
-    val memory = conf.get(EXECUTOR_MEMORY)
+
+    val isStandalone = conf.getOption("spark.master").exists(_.startsWith("spark://"))
+    val isLocalCluster = conf.getOption("spark.master").exists(_.startsWith("local-cluster"))
+    // By default, standalone executors take all available cores, do not have a specific value.
+    val cores = if (isStandalone || isLocalCluster) {
+      conf.getOption(EXECUTOR_CORES.key).map(_.toInt)
+    } else {
+      Some(conf.get(EXECUTOR_CORES))
+    }
+    cores.foreach(ereqs.cores)
+
+    // Setting all resources here, cluster managers will take the resources they respect.

Review Comment:
   Thanks for the suggestion, will remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
ivoson commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r902794551


##########
docs/job-scheduling.md:
##########
@@ -83,6 +83,10 @@ This feature is disabled by default and available on all coarse-grained cluster
 [Mesos coarse-grained mode](running-on-mesos.html#mesos-run-modes) and [K8s mode](running-on-kubernetes.html).
 
 
+### Caveats
+
+- In [standalone mode](spark-standalone.html), without explicitly setting cores for each executor, executors will get all the cores of a worker. In this case, when dynamic allocation enabled, spark will possibly acquire much more executors than expected. When you want to use dynamic allocation in [standalone mode](spark-standalone.html), you are recommended to explicitly set cores for each executor before the issue [SPARK-30299](https://issues.apache.org/jira/browse/SPARK-30299) got fixed.

Review Comment:
   > "...without explicitly setting `spark.executor.cores`..."
   
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r902140262


##########
docs/job-scheduling.md:
##########
@@ -83,6 +83,10 @@ This feature is disabled by default and available on all coarse-grained cluster
 [Mesos coarse-grained mode](running-on-mesos.html#mesos-run-modes) and [K8s mode](running-on-kubernetes.html).
 
 
+### Caveats
+
+- In [standalone mode](spark-standalone.html), without explicitly setting cores for each executor, executors will get all the cores of a worker. In this case, when dynamic allocation enabled, spark will possibly acquire much more executors than expected. When you want to use dynamic allocation in [standalone mode](spark-standalone.html), you are recommended to explicitly set cores for each executor before the issue [SPARK-30299](https://issues.apache.org/jira/browse/SPARK-30299) got fixed.

Review Comment:
   "...without explicitly setting `spark.executor.cores`..."



##########
docs/job-scheduling.md:
##########
@@ -83,6 +83,10 @@ This feature is disabled by default and available on all coarse-grained cluster
 [Mesos coarse-grained mode](running-on-mesos.html#mesos-run-modes) and [K8s mode](running-on-kubernetes.html).
 
 
+### Caveats
+
+- In [standalone mode](spark-standalone.html), without explicitly setting cores for each executor, executors will get all the cores of a worker. In this case, when dynamic allocation enabled, spark will possibly acquire much more executors than expected. When you want to use dynamic allocation in [standalone mode](spark-standalone.html), you are recommended to explicitly set cores for each executor before the issue [SPARK-30299](https://issues.apache.org/jira/browse/SPARK-30299) got fixed.

Review Comment:
   I didn't see where this case is warned in the code. Could you add it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r903253826


##########
core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala:
##########
@@ -63,17 +64,28 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf,
    */
   private[spark] def isSupported(rp: ResourceProfile): Boolean = {
     val isNotDefaultProfile = rp.id != ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
-    val notYarnOrK8sAndNotDefaultProfile = isNotDefaultProfile && !(isYarn || isK8s)
-    val YarnOrK8sNotDynAllocAndNotDefaultProfile =
-      isNotDefaultProfile && (isYarn || isK8s) && !dynamicEnabled
+    val notYarnOrK8sOrStandaloneAndNotDefaultProfile =
+      isNotDefaultProfile && !(isYarn || isK8s || isStandalone)
+    val YarnOrK8sOrStandaloneNotDynAllocAndNotDefaultProfile =
+      isNotDefaultProfile && (isYarn || isK8s || isStandalone) && !dynamicEnabled
     // We want the exception to be thrown only when we are specifically testing for the
     // exception or in a real application. Otherwise in all other testing scenarios we want
     // to skip throwing the exception so that we can test in other modes to make testing easier.
     if ((notRunningUnitTests || testExceptionThrown) &&
-        (notYarnOrK8sAndNotDefaultProfile || YarnOrK8sNotDynAllocAndNotDefaultProfile)) {
+        (notYarnOrK8sOrStandaloneAndNotDefaultProfile ||
+          YarnOrK8sOrStandaloneNotDynAllocAndNotDefaultProfile)) {
       throw new SparkException("ResourceProfiles are only supported on YARN and Kubernetes " +
-        "with dynamic allocation enabled.")
+        "and Standalone with dynamic allocation enabled.")
     }
+
+    if (isStandalone && rp.getExecutorCores.isEmpty &&
+      sparkConf.getOption(config.EXECUTOR_CORES.key).isEmpty) {
+      logWarning(s"Executor cores is not set for resource profile: ${rp.id}, and " +
+        s"spark.executor.cores is also not specified, you may get more executors allocated than " +
+        s"expected. It's recommended to set executor cores explicitly. Check this issue " +

Review Comment:
   nit: "Neither executor cores is set for resource profile, nor spark.executor.cores is explicitly set, you may ..."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r903254447


##########
core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala:
##########
@@ -111,15 +112,19 @@ private[spark] class StandaloneSchedulerBackend(
     // ExecutorAllocationManager will send the real initial limit to the Master later.
     val initialExecutorLimit =
       if (Utils.isDynamicAllocationEnabled(conf)) {
+        if (coresPerExecutor.isEmpty) {
+          logWarning("Dynamic allocation without explicitly setting spark.executor.cores " +
+            "detected, you may get more executors allocated than expected. It's recommended to " +
+            "set spark.executor.cores explicitly. Check this issue for more details: " +
+            "https://issues.apache.org/jira/browse/SPARK-30299")

Review Comment:
   nit: "Please check [SPARK-30299](https://issues.apache.org/jira/browse/SPARK-30299) for more details."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r903254945


##########
core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala:
##########
@@ -111,15 +112,19 @@ private[spark] class StandaloneSchedulerBackend(
     // ExecutorAllocationManager will send the real initial limit to the Master later.
     val initialExecutorLimit =
       if (Utils.isDynamicAllocationEnabled(conf)) {
+        if (coresPerExecutor.isEmpty) {
+          logWarning("Dynamic allocation without explicitly setting spark.executor.cores " +
+            "detected, you may get more executors allocated than expected. It's recommended to " +

Review Comment:
   nit: "Dynamic allocation enabled without spark.executor.cores explicitly set, you may ..."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
ivoson commented on PR #36716:
URL: https://github.com/apache/spark/pull/36716#issuecomment-1172363057

   GA passed after retry. 
   
   Looked like the test suite below failed sometimes.
   org.apache.spark.sql.execution.datasources.parquet.ParquetIOSuite


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
tgravescs commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r903853713


##########
core/src/test/scala/org/apache/spark/LocalSparkContext.scala:
##########
@@ -31,6 +31,8 @@ trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self
 
   override def beforeAll(): Unit = {
     super.beforeAll()
+    // Clear default profile, just in case default profile is not cleaned in other suites.
+    ResourceProfile.clearDefaultProfile()

Review Comment:
   it might be nice to look at why and update that suite if its not cleaning up properly. if its something we can't get around then fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
ivoson commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r902811828


##########
core/src/test/scala/org/apache/spark/LocalSparkContext.scala:
##########
@@ -31,6 +31,8 @@ trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self
 
   override def beforeAll(): Unit = {
     super.beforeAll()
+    // Clear default profile, just in case default profile is not cleaned in other suites.
+    ResourceProfile.clearDefaultProfile()

Review Comment:
   Yes, without this change, one of the test suites failed because of unexpected default profile.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
ivoson commented on PR #36716:
URL: https://github.com/apache/spark/pull/36716#issuecomment-1140457916

   cc @Ngone51 @jiangxb1987 @xuanyuanking, could you please help review this PR? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r885358948


##########
core/src/main/scala/org/apache/spark/deploy/master/ResourceDescription.scala:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import org.apache.spark.resource.ResourceRequirement
+
+/**
+ * Describe resource requests for different resource profiles. Used for executor schedule.
+ *
+ * @param coresPerExecutor cores for each executor.
+ * @param memoryMbPerExecutor memory for each executor.
+ * @param customResourcesPerExecutor custom resource requests for each executor.
+ */
+private[spark] case class ResourceDescription(

Review Comment:
   How about `ExecutorResourceDescription`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
ivoson commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r885749761


##########
core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala:
##########
@@ -65,7 +66,70 @@ private[spark] class ApplicationInfo(
     appSource = new ApplicationSource(this)
     nextExecutorId = 0
     removedExecutors = new ArrayBuffer[ExecutorDesc]
-    executorLimit = desc.initialExecutorLimit.getOrElse(Integer.MAX_VALUE)
+    val initialExecutorLimit = desc.initialExecutorLimit.getOrElse(Integer.MAX_VALUE)
+
+    rpIdToResourceProfile = new mutable.HashMap[Int, ResourceProfile]()
+    rpIdToResourceProfile(DEFAULT_RESOURCE_PROFILE_ID) = desc.defaultProfile
+    rpIdToResourceDesc = new mutable.HashMap[Int, ResourceDescription]()
+    createResourceDescForResourceProfile(desc.defaultProfile)
+
+    targetNumExecutorsPerResourceProfileId = new mutable.HashMap[Int, Int]()
+    targetNumExecutorsPerResourceProfileId(DEFAULT_RESOURCE_PROFILE_ID) = initialExecutorLimit

Review Comment:
   Not sure if I get this right. Please also help check this. In [ApplicationInfo](https://github.com/apache/spark/blob/v3.3.0-rc3/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala#L49) we can see that the variable `initialExecutorLimit` is used for dynamic allocation to limit the executor num for the app to start with.
   And when [dynamic allocation is enabled](https://github.com/apache/spark/blob/v3.3.0-rc3/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala#L120), the limit would be 0, otherwise it would be None for now. And the `ApplicationDescription` is created during `SparkContext` initialization. At this time point, we can only have default resource profile. 
   
   For other resource profiles, we rely on [`ExecutorAllocationManager`](https://github.com/apache/spark/blob/v3.3.0-rc3/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L431) to update the requests for different resource profiles.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r885297634


##########
core/src/main/scala/org/apache/spark/deploy/ExecutorDescription.scala:
##########
@@ -25,10 +25,13 @@ package org.apache.spark.deploy
 private[deploy] class ExecutorDescription(
     val appId: String,
     val execId: Int,
+    val rpId: Int,
     val cores: Int,
+    val memoryMb: Int,

Review Comment:
   Why do we need `memoryMb` now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r890949860


##########
core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala:
##########
@@ -65,7 +66,70 @@ private[spark] class ApplicationInfo(
     appSource = new ApplicationSource(this)
     nextExecutorId = 0
     removedExecutors = new ArrayBuffer[ExecutorDesc]
-    executorLimit = desc.initialExecutorLimit.getOrElse(Integer.MAX_VALUE)
+    val initialExecutorLimit = desc.initialExecutorLimit.getOrElse(Integer.MAX_VALUE)
+
+    rpIdToResourceProfile = new mutable.HashMap[Int, ResourceProfile]()
+    rpIdToResourceProfile(DEFAULT_RESOURCE_PROFILE_ID) = desc.defaultProfile
+    rpIdToResourceDesc = new mutable.HashMap[Int, ExecutorResourceDescription]()
+    createResourceDescForResourceProfile(desc.defaultProfile)
+
+    targetNumExecutorsPerResourceProfileId = new mutable.HashMap[Int, Int]()
+    targetNumExecutorsPerResourceProfileId(DEFAULT_RESOURCE_PROFILE_ID) = initialExecutorLimit
+
+    executorsPerResourceProfileId = new mutable.HashMap[Int, mutable.Set[Int]]()
+  }
+
+  private[deploy] def getOrUpdateExecutorsForRPId(rpId: Int): mutable.Set[Int] = {
+    executorsPerResourceProfileId.getOrElseUpdate(rpId, mutable.HashSet[Int]())
+  }
+
+  private[deploy] def getTargetExecutorNumForRPId(rpId: Int): Int = {
+    targetNumExecutorsPerResourceProfileId.getOrElse(rpId, 0)
+  }
+
+  private[deploy] def getRequestedRPIds(): Seq[Int] = {
+    rpIdToResourceProfile.keys.toSeq.sorted
+  }
+
+  private def createResourceDescForResourceProfile(resourceProfile: ResourceProfile): Unit = {
+    if (!rpIdToResourceDesc.contains(resourceProfile.id)) {
+      val defaultMemoryMbPerExecutor = desc.memoryPerExecutorMB
+      val defaultCoresPerExecutor = desc.coresPerExecutor
+      val coresPerExecutor = resourceProfile.getExecutorCores
+        .orElse(defaultCoresPerExecutor)
+      val memoryMbPerExecutor = resourceProfile.getExecutorMemory
+        .map(_.toInt)
+        .getOrElse(defaultMemoryMbPerExecutor)
+      val customResources = ResourceUtils.executorResourceRequestToRequirement(
+        getCustomExecutorResources(resourceProfile).values.toSeq)
+
+      rpIdToResourceDesc(resourceProfile.id) =
+        ExecutorResourceDescription(coresPerExecutor, memoryMbPerExecutor, customResources)
+    }
+  }
+
+  // Get resources required for schedule.
+  private[deploy] def getResourceDescriptionForRpId(rpId: Int): ExecutorResourceDescription = {
+    rpIdToResourceDesc(rpId)
+  }
+
+  private[deploy] def requestExecutors(
+      resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Unit = {
+    resourceProfileToTotalExecs.foreach { case (rp, num) =>
+      createResourceDescForResourceProfile(rp)
+
+      if (!rpIdToResourceProfile.contains(rp.id)) {
+        rpIdToResourceProfile(rp.id) = rp
+      }
+
+      if (!targetNumExecutorsPerResourceProfileId.get(rp.id).contains(num)) {
+        targetNumExecutorsPerResourceProfileId(rp.id) = num
+      }

Review Comment:
   How about:
   ```suggestion
         targetNumExecutorsPerResourceProfileId(rp.id) = num
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r891171201


##########
core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala:
##########
@@ -530,6 +535,87 @@ class MasterSuite extends SparkFunSuite
     schedulingWithEverything(spreadOut = false)
   }
 
+  test("scheduling for app with multiple resource profiles") {
+    scheduleExecutorsForAppWithMultiRPs(withMaxCores = false)
+  }
+
+  test("scheduling for app with multiple resource profiles with max cores") {
+    scheduleExecutorsForAppWithMultiRPs(withMaxCores = true)
+  }
+
+  private def scheduleExecutorsForAppWithMultiRPs(withMaxCores: Boolean): Unit = {
+    val appInfo: ApplicationInfo = if (withMaxCores) {
+      makeAppInfo(
+      1024, maxCores = Some(30), initialExecutorLimit = Some(0))

Review Comment:
   nit: 2 indents?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
ivoson commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r893595106


##########
core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala:
##########
@@ -19,23 +19,28 @@ package org.apache.spark.deploy
 
 import java.net.URI
 
-import org.apache.spark.resource.ResourceRequirement
+import org.apache.spark.resource.{ResourceProfile, ResourceRequirement, ResourceUtils}
+import org.apache.spark.resource.ResourceProfile.getCustomExecutorResources
 
 private[spark] case class ApplicationDescription(
     name: String,
     maxCores: Option[Int],
-    memoryPerExecutorMB: Int,
     command: Command,
     appUiUrl: String,
+    defaultProfile: ResourceProfile,
     eventLogDir: Option[URI] = None,
     // short name of compression codec used when writing event logs, if any (e.g. lzf)
     eventLogCodec: Option[String] = None,
-    coresPerExecutor: Option[Int] = None,
     // number of executors this application wants to start with,
     // only used if dynamic allocation is enabled
     initialExecutorLimit: Option[Int] = None,
-    user: String = System.getProperty("user.name", "<unknown>"),
-    resourceReqsPerExecutor: Seq[ResourceRequirement] = Seq.empty) {
+    user: String = System.getProperty("user.name", "<unknown>")) {
+
+  def memoryPerExecutorMB: Int = defaultProfile.getExecutorMemory.map(_.toInt).getOrElse(1024)
+  def coresPerExecutor: Option[Int] = defaultProfile.getExecutorCores
+  def resourceReqsPerExecutor: Seq[ResourceRequirement] =
+    ResourceUtils.executorResourceRequestToRequirement(
+      getCustomExecutorResources(defaultProfile).values.toSeq.sortBy(_.resourceName))

Review Comment:
   Sure, will make the change. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
tgravescs commented on PR #36716:
URL: https://github.com/apache/spark/pull/36716#issuecomment-1148763057

   >  The feature is enabled when dynamic allocation enabled in standalone cluster.
   
   So last time I checked dynamic allocation in standalone mode had issues.  Have this been addressed or do you not see those issues?  one such issue was https://issues.apache.org/jira/browse/SPARK-30299?filter=-2 but I thought there was others.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r891178775


##########
core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala:
##########
@@ -530,6 +535,87 @@ class MasterSuite extends SparkFunSuite
     schedulingWithEverything(spreadOut = false)
   }
 
+  test("scheduling for app with multiple resource profiles") {
+    scheduleExecutorsForAppWithMultiRPs(withMaxCores = false)
+  }
+
+  test("scheduling for app with multiple resource profiles with max cores") {
+    scheduleExecutorsForAppWithMultiRPs(withMaxCores = true)
+  }
+
+  private def scheduleExecutorsForAppWithMultiRPs(withMaxCores: Boolean): Unit = {
+    val appInfo: ApplicationInfo = if (withMaxCores) {
+      makeAppInfo(
+      1024, maxCores = Some(30), initialExecutorLimit = Some(0))
+    } else {
+      makeAppInfo(
+        1024, maxCores = None, initialExecutorLimit = Some(0))
+    }
+
+    val master = makeAliveMaster()
+    val conf = new SparkConf()
+    val workers = (1 to 4).map { idx =>
+      val worker = new MockWorker(master.self, conf)
+      worker.rpcEnv.setupEndpoint(s"worker-$idx", worker)
+      val workerReg = RegisterWorker(
+        worker.id,
+        "localhost",
+        worker.self.address.port,
+        worker.self,
+        10,
+        4096,
+        "http://localhost:8080",
+        RpcAddress("localhost", 10000))
+      master.self.send(workerReg)
+      worker
+    }
+
+    // Register app and schedule.
+    master.registerApplication(appInfo)
+    startExecutorsOnWorkers(master)
+    assert(appInfo.executors.isEmpty)
+
+    // Request executors with multiple resource profile.
+    val rp1 = DeployTestUtils.createResourceProfile(Some(2048), Map.empty, Some(5))
+    val rp2 = DeployTestUtils.createResourceProfile(Some(2048), Map.empty, Some(10))

Review Comment:
   Could you also test the case where no worker can satisfy the resource profile ? In this case, no executor for that rp should be launched.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
ivoson commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r893568975


##########
core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala:
##########
@@ -107,11 +107,11 @@ object JsonConstants {
       |{"id":"id","starttime":3,"name":"name",
       |"cores":0,"user":"%s",
       |"memoryperexecutor":1234,
-      |"resourcesperexecutor":[{"name":"gpu",
-      |"amount":3},{"name":"fpga","amount":3}],
+      |"resourcesperexecutor":[{"name":"fpga",
+      |"amount":3},{"name":"gpu","amount":3}],

Review Comment:
   As also mentioned above, this is caused by the sort of custom executor resources in `ApplicationDescription`
   
   ```
   def resourceReqsPerExecutor: Seq[ResourceRequirement] =
       ResourceUtils.executorResourceRequestToRequirement(
         getCustomExecutorResources(defaultProfile).values.toSeq.sortBy(_.resourceName))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
ivoson commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r893571356


##########
core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala:
##########
@@ -530,6 +535,87 @@ class MasterSuite extends SparkFunSuite
     schedulingWithEverything(spreadOut = false)
   }
 
+  test("scheduling for app with multiple resource profiles") {
+    scheduleExecutorsForAppWithMultiRPs(withMaxCores = false)
+  }
+
+  test("scheduling for app with multiple resource profiles with max cores") {
+    scheduleExecutorsForAppWithMultiRPs(withMaxCores = true)
+  }
+
+  private def scheduleExecutorsForAppWithMultiRPs(withMaxCores: Boolean): Unit = {
+    val appInfo: ApplicationInfo = if (withMaxCores) {
+      makeAppInfo(
+      1024, maxCores = Some(30), initialExecutorLimit = Some(0))

Review Comment:
   Thanks, will fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
ivoson commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r893570128


##########
core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala:
##########
@@ -299,9 +300,10 @@ private[spark] class StandaloneAppClient(
    *
    * @return whether the request is acknowledged.
    */
-  def requestTotalExecutors(requestedTotal: Int): Future[Boolean] = {

Review Comment:
   Sounds good, will make the change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r890973724


##########
core/src/main/scala/org/apache/spark/deploy/master/ExecutorResourceDescription.scala:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import org.apache.spark.resource.ResourceRequirement
+
+/**
+ * Describe resource requests for different resource profiles. Used for executor schedule.
+ *
+ * @param coresPerExecutor cores for each executor.
+ * @param memoryMbPerExecutor memory for each executor.
+ * @param customResourcesPerExecutor custom resource requests for each executor.

Review Comment:
   nit: "resource requests" -> "resource requirements"
   
   (I think we also have `ExecutorResourceRequest` so it's good to distugish them carefully.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r890912848


##########
core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala:
##########
@@ -19,23 +19,28 @@ package org.apache.spark.deploy
 
 import java.net.URI
 
-import org.apache.spark.resource.ResourceRequirement
+import org.apache.spark.resource.{ResourceProfile, ResourceRequirement, ResourceUtils}
+import org.apache.spark.resource.ResourceProfile.getCustomExecutorResources
 
 private[spark] case class ApplicationDescription(
     name: String,
     maxCores: Option[Int],
-    memoryPerExecutorMB: Int,
     command: Command,
     appUiUrl: String,
+    defaultProfile: ResourceProfile,
     eventLogDir: Option[URI] = None,
     // short name of compression codec used when writing event logs, if any (e.g. lzf)
     eventLogCodec: Option[String] = None,
-    coresPerExecutor: Option[Int] = None,
     // number of executors this application wants to start with,
     // only used if dynamic allocation is enabled
     initialExecutorLimit: Option[Int] = None,
-    user: String = System.getProperty("user.name", "<unknown>"),
-    resourceReqsPerExecutor: Seq[ResourceRequirement] = Seq.empty) {
+    user: String = System.getProperty("user.name", "<unknown>")) {
+
+  def memoryPerExecutorMB: Int = defaultProfile.getExecutorMemory.map(_.toInt).getOrElse(1024)
+  def coresPerExecutor: Option[Int] = defaultProfile.getExecutorCores
+  def resourceReqsPerExecutor: Seq[ResourceRequirement] =
+    ResourceUtils.executorResourceRequestToRequirement(
+      getCustomExecutorResources(defaultProfile).values.toSeq.sortBy(_.resourceName))

Review Comment:
   Is `sortBy(_.resourceName)` necessary? I didn't see we sort it in `ApplicationInfo`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r891166646


##########
core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala:
##########
@@ -299,9 +300,10 @@ private[spark] class StandaloneAppClient(
    *
    * @return whether the request is acknowledged.
    */
-  def requestTotalExecutors(requestedTotal: Int): Future[Boolean] = {

Review Comment:
   How about leaving this method as it is and delagtes to `requestTotalExecutors(Map(default resource profile -> requestedTotal))` instead? So that some tests remains unchanged.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r903252766


##########
core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala:
##########
@@ -63,17 +64,28 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf,
    */
   private[spark] def isSupported(rp: ResourceProfile): Boolean = {
     val isNotDefaultProfile = rp.id != ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
-    val notYarnOrK8sAndNotDefaultProfile = isNotDefaultProfile && !(isYarn || isK8s)
-    val YarnOrK8sNotDynAllocAndNotDefaultProfile =
-      isNotDefaultProfile && (isYarn || isK8s) && !dynamicEnabled
+    val notYarnOrK8sOrStandaloneAndNotDefaultProfile =
+      isNotDefaultProfile && !(isYarn || isK8s || isStandalone)
+    val YarnOrK8sOrStandaloneNotDynAllocAndNotDefaultProfile =
+      isNotDefaultProfile && (isYarn || isK8s || isStandalone) && !dynamicEnabled
     // We want the exception to be thrown only when we are specifically testing for the
     // exception or in a real application. Otherwise in all other testing scenarios we want
     // to skip throwing the exception so that we can test in other modes to make testing easier.
     if ((notRunningUnitTests || testExceptionThrown) &&
-        (notYarnOrK8sAndNotDefaultProfile || YarnOrK8sNotDynAllocAndNotDefaultProfile)) {
+        (notYarnOrK8sOrStandaloneAndNotDefaultProfile ||
+          YarnOrK8sOrStandaloneNotDynAllocAndNotDefaultProfile)) {
       throw new SparkException("ResourceProfiles are only supported on YARN and Kubernetes " +
-        "with dynamic allocation enabled.")
+        "and Standalone with dynamic allocation enabled.")
     }
+
+    if (isStandalone && rp.getExecutorCores.isEmpty &&
+      sparkConf.getOption(config.EXECUTOR_CORES.key).isEmpty) {
+      logWarning(s"Executor cores is not set for resource profile: ${rp.id}, and " +
+        s"spark.executor.cores is also not specified, you may get more executors allocated than " +
+        s"expected. It's recommended to set executor cores explicitly. Check this issue " +
+        s"for more details: https://issues.apache.org/jira/browse/SPARK-30299")

Review Comment:
   nit: "Please check SPARK-30299 for more details."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ivoson commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
ivoson commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r902832247


##########
core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala:
##########
@@ -54,6 +54,7 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf,
   private val master = sparkConf.getOption("spark.master")
   private val isYarn = master.isDefined && master.get.equals("yarn")
   private val isK8s = master.isDefined && master.get.startsWith("k8s://")
+  private val isStandalone = master.isDefined && master.get.startsWith("spark://")

Review Comment:
   Here we only try to support standalone cluster here. The reason for the local-cluster code change in `ResourceProfile` is that local-cluster and standalone share the same `StandaloneSchedulerBackend` and Master. After the changes in this PR, the master will schedule based on resource profile, so we also need to create default resource profile for local-cluster. Does this make sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
tgravescs commented on code in PR #36716:
URL: https://github.com/apache/spark/pull/36716#discussion_r903859600


##########
core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala:
##########
@@ -336,9 +340,23 @@ object ResourceProfile extends Logging {
 
   private def getDefaultExecutorResources(conf: SparkConf): Map[String, ExecutorResourceRequest] = {
     val ereqs = new ExecutorResourceRequests()
-    val cores = conf.get(EXECUTOR_CORES)
-    ereqs.cores(cores)
-    val memory = conf.get(EXECUTOR_MEMORY)
+
+    val isStandalone = conf.getOption("spark.master").exists(_.startsWith("spark://"))
+    val isLocalCluster = conf.getOption("spark.master").exists(_.startsWith("local-cluster"))
+    // By default, standalone executors take all available cores, do not have a specific value.
+    val cores = if (isStandalone || isLocalCluster) {
+      conf.getOption(EXECUTOR_CORES.key).map(_.toInt)
+    } else {
+      Some(conf.get(EXECUTOR_CORES))
+    }
+    cores.foreach(ereqs.cores)
+
+    // Setting all resources here, cluster managers will take the resources they respect.

Review Comment:
   I guess so, I don't really see the purpose of the comment, I would expect all resources to be in the resourceProfile.  What the cluster manager does with those isn't really the concern of this class.  If there was something that shouldn't be set for some cluster manager, I can see commenting on that, but I would hope we would just update the cluster manager to do the right thing.  I would just remove the comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on PR #36716:
URL: https://github.com/apache/spark/pull/36716#issuecomment-1141366692

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org