You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2023/04/04 09:07:13 UTC
[kyuubi] branch master updated: [KYUUBI #4664] Fix empty relation when kill executors
This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 061545b2b [KYUUBI #4664] Fix empty relation when kill executors
061545b2b is described below
commit 061545b2bd632586d8d8367ff6d90dea3949e52f
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Tue Apr 4 17:06:57 2023 +0800
[KYUUBI #4664] Fix empty relation when kill executors
### _Why are the changes needed?_
This pr fixes a corner case when repartition on a local relation. e.g.,
```
Repartition
|
LocalRelation
```
it would throw exception since there is no a actually shuffle happen
```
java.util.NoSuchElementException: key not found: 3
at scala.collection.MapLike.default(MapLike.scala:235)
at scala.collection.MapLike.default$(MapLike.scala:234)
at scala.collection.AbstractMap.default(Map.scala:63)
at scala.collection.MapLike.apply(MapLike.scala:144)
at scala.collection.MapLike.apply$(MapLike.scala:143)
at scala.collection.AbstractMap.apply(Map.scala:63)
at org.apache.spark.sql.FinalStageResourceManager.findExecutorToKill(FinalStageResourceManager.scala:122)
at org.apache.spark.sql.FinalStageResourceManager.killExecutors(FinalStageResourceManager.scala:175)
```
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #4664 from ulysses-you/kill-executors-followup.
Closes #4664
3811eaee9 [ulysses-you] Fix empty relation
Authored-by: ulysses-you <ul...@gmail.com>
Signed-off-by: ulyssesyou <ul...@apache.org>
---
.../org/apache/spark/sql/FinalStageResourceManager.scala | 12 ++++++++++--
1 file changed, 10 insertions(+), 2 deletions(-)
diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
index 2bf7ae6b7..ca3f762e1 100644
--- a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
+++ b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
@@ -69,6 +69,7 @@ case class FinalStageResourceManager(session: SparkSession)
return plan
}
+ // TODO: move this to query stage optimizer when updating Spark to 3.5.x
// Since we are in `prepareQueryStage`, the AQE shuffle read has not been applied.
// So we need to apply it by self.
val shuffleRead = queryStageOptimizerRules.foldLeft(stageOpt.get.asInstanceOf[SparkPlan]) {
@@ -119,7 +120,11 @@ case class FinalStageResourceManager(session: SparkSession)
shuffleId: Int,
numReduce: Int): Seq[String] = {
val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
- val shuffleStatus = tracker.shuffleStatuses(shuffleId)
+ val shuffleStatusOpt = tracker.shuffleStatuses.get(shuffleId)
+ if (shuffleStatusOpt.isEmpty) {
+ return Seq.empty
+ }
+ val shuffleStatus = shuffleStatusOpt.get
val executorToBlockSize = new mutable.HashMap[String, Long]
shuffleStatus.withMapStatuses { mapStatus =>
mapStatus.foreach { status =>
@@ -175,6 +180,9 @@ case class FinalStageResourceManager(session: SparkSession)
val executorsToKill = findExecutorToKill(sc, targetExecutors, shuffleId, numReduce)
logInfo(s"Request to kill executors, total count ${executorsToKill.size}, " +
s"[${executorsToKill.mkString(", ")}].")
+ if (executorsToKill.isEmpty) {
+ return
+ }
// Note, `SparkContext#killExecutors` does not allow with DRA enabled,
// see `https://github.com/apache/spark/pull/20604`.
@@ -201,7 +209,7 @@ trait FinalRebalanceStageHelper {
case f: FilterExec => findFinalRebalanceStage(f.child)
case s: SortExec if !s.global => findFinalRebalanceStage(s.child)
case stage: ShuffleQueryStageExec
- if stage.isMaterialized &&
+ if stage.isMaterialized && stage.mapStats.isDefined &&
stage.plan.isInstanceOf[ShuffleExchangeExec] &&
stage.plan.asInstanceOf[ShuffleExchangeExec].shuffleOrigin != ENSURE_REQUIREMENTS =>
Some(stage)