You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/03/10 16:03:20 UTC

[GitHub] [spark] tgravescs commented on a change in pull request #33941: [SPARK-36699][Core] Reuse compatible executors for stage-level scheduling

tgravescs commented on a change in pull request #33941:
URL: https://github.com/apache/spark/pull/33941#discussion_r823831564



##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -502,14 +511,27 @@ private[spark] class ExecutorAllocationManager(
    */
   private def addExecutors(maxNumExecutorsNeeded: Int, rpId: Int): Int = {
     val oldNumExecutorsTarget = numExecutorsTargetPerResourceProfileId(rpId)
-    // Do not request more executors if it would put our target over the upper bound
-    // this is doing a max check per ResourceProfile
-    if (oldNumExecutorsTarget >= maxNumExecutors) {
-      logDebug("Not adding executors because our current target total " +
-        s"is already ${oldNumExecutorsTarget} (limit $maxNumExecutors)")
-      numExecutorsToAddPerResourceProfileId(rpId) = 1
-      return 0
+
+    if (!reuseExecutors) {
+      // Do not request more executors if it would put our target over the upper bound
+      // this is doing a max check per ResourceProfile
+      if (oldNumExecutorsTarget >= maxNumExecutors) {
+        logDebug("Not adding executors because our current target total " +
+          s"is already ${oldNumExecutorsTarget} (limit $maxNumExecutors)")
+        numExecutorsToAddPerResourceProfileId(rpId) = 1
+        return 0
+      }
+    } else {
+      val numCompatibleExecutors = numExecutorsTargetsCompatibleProfiles(rpId)
+      if (oldNumExecutorsTarget +  numCompatibleExecutors >= maxNumExecutors) {
+        logDebug("Not adding executors because our current target total is already " +
+          s"${oldNumExecutorsTarget} (limit: max $maxNumExecutors - " +
+          s"reused $numCompatibleExecutors)")

Review comment:
       format differently (limit:...)(reused: ...)

##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -518,11 +540,25 @@ private[spark] class ExecutorAllocationManager(
     numExecutorsTarget += numExecutorsToAddPerResourceProfileId(rpId)
     // Ensure that our target doesn't exceed what we need at the present moment:
     numExecutorsTarget = math.min(numExecutorsTarget, maxNumExecutorsNeeded)
-    // Ensure that our target fits within configured bounds:
-    numExecutorsTarget = math.max(math.min(numExecutorsTarget, maxNumExecutors), minNumExecutors)
+    numExecutorsTarget = if (!reuseExecutors) {
+      // Ensure that our target fits within configured bounds:
+      math.max(math.min(numExecutorsTarget, maxNumExecutors), minNumExecutors)
+    } else {
+      // Ensure that our target fits within adjusted bounds:
+      val numCompatibleExecutors = numExecutorsTargetsCompatibleProfiles(rpId)
+      val adjustedMinNumExecutors = math.max(0, minNumExecutors - numCompatibleExecutors)
+      val adjustedMaxNumExecutors = math.max(1, maxNumExecutors - numCompatibleExecutors)

Review comment:
       this part doesn't make sense to me on initial reading.  If we set the min and max and our target should still fit within those and not those adjusted.  I get that you are trying to say including the compatible ones keep it in that limit but I think this is hard to read to understand that. this would also get more complicated if the reuse policy could change within an application.

##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -297,11 +299,13 @@ private[spark] class ExecutorAllocationManager(
     val numRunningOrPendingTasks = pendingTask + pendingSpeculative + running
     val rp = resourceProfileManager.resourceProfileFromId(rpId)
     val tasksPerExecutor = rp.maxTasksPerExecutor(conf)
-    logDebug(s"max needed for rpId: $rpId numpending: $numRunningOrPendingTasks," +
-      s" tasksperexecutor: $tasksPerExecutor")
+
     val maxNeeded = math.ceil(numRunningOrPendingTasks * executorAllocationRatio /
       tasksPerExecutor).toInt
 
+    logDebug(s"max needed for rpId: $rpId numpending: $numRunningOrPendingTasks," +
+      s" tasksperexecutor: $tasksPerExecutor = $maxNeeded")

Review comment:
       we should change to be more descriptive,  even just maxNeeded: $maxNeeded

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1958,6 +1958,12 @@ package object config {
       .timeConf(TimeUnit.MILLISECONDS)
       .createOptional
 
+  private[spark] val SCHEDULER_REUSE_COMPATIBLE_EXECUTORS =
+    ConfigBuilder("spark.scheduler.reuseCompatibleExecutors")
+      .version("3.3.0")

Review comment:
       needs .doc added to it. I need to think about the name of it.

##########
File path: core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
##########
@@ -283,8 +283,9 @@ case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
 case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent
 
 @DeveloperApi
-@Since("3.1.0")
-case class SparkListenerResourceProfileAdded(resourceProfile: ResourceProfile)
+@Since("3.3.0")
+case class SparkListenerResourceProfileAdded(resourceProfile: ResourceProfile,

Review comment:
       I would prefer not to change this interface as it gets used by people. I'd rather create a new one if needed but need to look at how its all used.  If its just for environment page to show compatible, I'm not sure its worth it.  WE can come back to this once figure out main logic

##########
File path: core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
##########
@@ -77,13 +85,27 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf,
     true
   }
 
+  def updateResourceReusePolicy(resourceNames: Set[String],
+    policy: ResourceProfileCompatiblePolicy): Unit = {
+    writeLock.lock()
+    try {
+      reuseResourceNames = resourceNames
+      reusePolicy = policy

Review comment:
       I must be missing where this is used?  Or you wanted feedback first?

##########
File path: core/src/main/scala/org/apache/spark/rdd/RDD.scala
##########
@@ -1809,6 +1810,22 @@ abstract class RDD[T: ClassTag](
     this
   }
 
+  /**
+   * Specify a ResourceProfile and reuse existing compatible executors to use when calculating
+   * this RDD.
+   * @param reuseResourceNames specify what resource should be checked when reusing executors
+   * @param reusePolicy specify executor reuse policy

Review comment:
       I would rather see ResourceProfileCompatiblePolicy as a public interface that user could implement their own policy. We can provide a couple very basic ones, equals and AllGreater or something like that.
   Also while its more flexible to do per stage it also I think complicates the allocation strategy for dynamic allocation.
   We would also need to know if this policy would allow for using this without dynamic allocation, because theoretically if you are reusing executor with same executor profile but different task requirements, you wouldn't need dynamic allocation.

##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -502,14 +511,27 @@ private[spark] class ExecutorAllocationManager(
    */
   private def addExecutors(maxNumExecutorsNeeded: Int, rpId: Int): Int = {
     val oldNumExecutorsTarget = numExecutorsTargetPerResourceProfileId(rpId)
-    // Do not request more executors if it would put our target over the upper bound
-    // this is doing a max check per ResourceProfile
-    if (oldNumExecutorsTarget >= maxNumExecutors) {
-      logDebug("Not adding executors because our current target total " +
-        s"is already ${oldNumExecutorsTarget} (limit $maxNumExecutors)")
-      numExecutorsToAddPerResourceProfileId(rpId) = 1
-      return 0
+
+    if (!reuseExecutors) {
+      // Do not request more executors if it would put our target over the upper bound
+      // this is doing a max check per ResourceProfile
+      if (oldNumExecutorsTarget >= maxNumExecutors) {
+        logDebug("Not adding executors because our current target total " +
+          s"is already ${oldNumExecutorsTarget} (limit $maxNumExecutors)")
+        numExecutorsToAddPerResourceProfileId(rpId) = 1
+        return 0
+      }
+    } else {
+      val numCompatibleExecutors = numExecutorsTargetsCompatibleProfiles(rpId)
+      if (oldNumExecutorsTarget +  numCompatibleExecutors >= maxNumExecutors) {

Review comment:
       nit extra space after +

##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -384,9 +386,18 @@ private[spark] class TaskSchedulerImpl(
       val execId = shuffledOffers(i).executorId
       val host = shuffledOffers(i).host
       val taskSetRpID = taskSet.taskSet.resourceProfileId
-      // make the resource profile id a hard requirement for now - ie only put tasksets
-      // on executors where resource profile exactly matches.
-      if (taskSetRpID == shuffledOffers(i).resourceProfileId) {
+
+      val assignTasks = if (reuseExecutors) {
+        val compatibleProfiles = sc.resourceProfileManager.getCompatibleProfileIds(taskSetRpID)
+        taskSetRpID == shuffledOffers(i).resourceProfileId ||
+          compatibleProfiles.contains(shuffledOffers(i).resourceProfileId)
+        } else {

Review comment:
       nit indentation




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org