You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2023/04/04 09:07:13 UTC

[kyuubi] branch master updated: [KYUUBI #4664] Fix empty relation when kill executors

This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 061545b2b [KYUUBI #4664] Fix empty relation when kill executors
061545b2b is described below

commit 061545b2bd632586d8d8367ff6d90dea3949e52f
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Tue Apr 4 17:06:57 2023 +0800

    [KYUUBI #4664] Fix empty relation when kill executors
    
    ### _Why are the changes needed?_
    
    This pr fixes a corner case when repartition on a local relation. e.g.,
    ```
    Repartition
          |
    LocalRelation
    ```
    
    it would throw exception since there is no a actually shuffle happen
    ```
    java.util.NoSuchElementException: key not found: 3
            at scala.collection.MapLike.default(MapLike.scala:235)
            at scala.collection.MapLike.default$(MapLike.scala:234)
            at scala.collection.AbstractMap.default(Map.scala:63)
            at scala.collection.MapLike.apply(MapLike.scala:144)
            at scala.collection.MapLike.apply$(MapLike.scala:143)
            at scala.collection.AbstractMap.apply(Map.scala:63)
            at org.apache.spark.sql.FinalStageResourceManager.findExecutorToKill(FinalStageResourceManager.scala:122)
            at org.apache.spark.sql.FinalStageResourceManager.killExecutors(FinalStageResourceManager.scala:175)
    ```
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4664 from ulysses-you/kill-executors-followup.
    
    Closes #4664
    
    3811eaee9 [ulysses-you] Fix empty relation
    
    Authored-by: ulysses-you <ul...@gmail.com>
    Signed-off-by: ulyssesyou <ul...@apache.org>
---
 .../org/apache/spark/sql/FinalStageResourceManager.scala     | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)

diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
index 2bf7ae6b7..ca3f762e1 100644
--- a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
+++ b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
@@ -69,6 +69,7 @@ case class FinalStageResourceManager(session: SparkSession)
       return plan
     }
 
+    // TODO: move this to query stage optimizer when updating Spark to 3.5.x
     // Since we are in `prepareQueryStage`, the AQE shuffle read has not been applied.
     // So we need to apply it by self.
     val shuffleRead = queryStageOptimizerRules.foldLeft(stageOpt.get.asInstanceOf[SparkPlan]) {
@@ -119,7 +120,11 @@ case class FinalStageResourceManager(session: SparkSession)
       shuffleId: Int,
       numReduce: Int): Seq[String] = {
     val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
-    val shuffleStatus = tracker.shuffleStatuses(shuffleId)
+    val shuffleStatusOpt = tracker.shuffleStatuses.get(shuffleId)
+    if (shuffleStatusOpt.isEmpty) {
+      return Seq.empty
+    }
+    val shuffleStatus = shuffleStatusOpt.get
     val executorToBlockSize = new mutable.HashMap[String, Long]
     shuffleStatus.withMapStatuses { mapStatus =>
       mapStatus.foreach { status =>
@@ -175,6 +180,9 @@ case class FinalStageResourceManager(session: SparkSession)
     val executorsToKill = findExecutorToKill(sc, targetExecutors, shuffleId, numReduce)
     logInfo(s"Request to kill executors, total count ${executorsToKill.size}, " +
       s"[${executorsToKill.mkString(", ")}].")
+    if (executorsToKill.isEmpty) {
+      return
+    }
 
     // Note, `SparkContext#killExecutors` does not allow with DRA enabled,
     // see `https://github.com/apache/spark/pull/20604`.
@@ -201,7 +209,7 @@ trait FinalRebalanceStageHelper {
       case f: FilterExec => findFinalRebalanceStage(f.child)
       case s: SortExec if !s.global => findFinalRebalanceStage(s.child)
       case stage: ShuffleQueryStageExec
-          if stage.isMaterialized &&
+          if stage.isMaterialized && stage.mapStats.isDefined &&
             stage.plan.isInstanceOf[ShuffleExchangeExec] &&
             stage.plan.asInstanceOf[ShuffleExchangeExec].shuffleOrigin != ENSURE_REQUIREMENTS =>
         Some(stage)