You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/06/13 23:46:14 UTC

[GitHub] [spark] agrawaldevesh commented on a change in pull request #28818: [WIP][SPARK-31198][CORE] Use graceful decommissioning as part of dynamic scaling

agrawaldevesh commented on a change in pull request #28818:
URL: https://github.com/apache/spark/pull/28818#discussion_r439775571



##########
File path: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -497,6 +453,70 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
 
   protected def minRegisteredRatio: Double = _minRegisteredRatio
 
+  /**
+   * Request that the cluster manager decommission the specified executors.
+   * Default implementation delegates to kill, scheduler must override
+   * if it supports graceful decommissioning.

Review comment:
       Is this comment valid ? I don't see this method being overridden.

##########
File path: core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
##########
@@ -333,11 +335,19 @@ private[spark] class ExecutorMonitor(
   }
 
   override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {
-    if (!event.blockUpdatedInfo.blockId.isInstanceOf[RDDBlockId]) {
-      return
-    }
     val exec = ensureExecutorIsTracked(event.blockUpdatedInfo.blockManagerId.executorId,
       UNKNOWN_RESOURCE_PROFILE_ID)
+
+    // Check if it is a shuffle file, or RDD to pick the correct codepath for update
+    if (event.blockUpdatedInfo.blockId.isInstanceOf[ShuffleDataBlockId] && shuffleTrackingEnabled) {
+      event.blockUpdatedInfo.blockId match {
+        case ShuffleDataBlockId(shuffleId, _, _) => exec.addShuffle(shuffleId)
+        case _ => // For now we only update on data blocks
+      }

Review comment:
       Is this change related to the purpose of 'Using decommissioning for dynamic allocation' ? I didn't fully understand why this is done wrt to just the decommissioning logic. Perhaps this is about https://issues.apache.org/jira/browse/SPARK-31974 ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org