You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/04/10 08:36:59 UTC
[spark] branch branch-3.0 updated: [SPARK-31384][SQL] Fix NPE in
OptimizeSkewedJoin
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 6c9827f [SPARK-31384][SQL] Fix NPE in OptimizeSkewedJoin
6c9827f is described below
commit 6c9827f5c20ba2d477eab28288039abb2baf9324
Author: yi.wu <yi...@databricks.com>
AuthorDate: Fri Apr 10 08:16:48 2020 +0000
[SPARK-31384][SQL] Fix NPE in OptimizeSkewedJoin
1. Fix NPE in `OptimizeSkewedJoin`
2. prevent other potential NPE errors in AQE.
When there's a `inputRDD` of a plan has 0 partition, rule `OptimizeSkewedJoin` can hit NPE error because this kind of RDD means a null `MapOutputStatistics` due to:
https://github.com/apache/spark/blob/d98df7626b8a88cb9a1fee4f454b19333a9f3ced/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L68-L69
Thus, we should take care of such NPE errors in other places too.
No
Added a test.
Closes #28153 from Ngone51/npe.
Authored-by: yi.wu <yi...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit 6cddda7847ceabd257008986ede7ec844f1f3607)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../execution/adaptive/CoalesceShufflePartitions.scala | 11 +++--------
.../sql/execution/adaptive/DemoteBroadcastHashJoin.scala | 9 ++++-----
.../sql/execution/adaptive/OptimizeSkewedJoin.scala | 15 +++++----------
.../spark/sql/execution/adaptive/QueryStageExec.scala | 10 ++++++++++
.../sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 16 +++++++++++++++-
5 files changed, 37 insertions(+), 24 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
index 226d692..096d65f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.execution.adaptive
-import org.apache.spark.MapOutputStatistics
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlan
@@ -54,14 +53,10 @@ case class CoalesceShufflePartitions(session: SparkSession) extends Rule[SparkPl
if (!shuffleStages.forall(_.shuffle.canChangeNumPartitions)) {
plan
} else {
- val shuffleMetrics = shuffleStages.map { stage =>
- assert(stage.resultOption.isDefined, "ShuffleQueryStageExec should already be ready")
- stage.resultOption.get.asInstanceOf[MapOutputStatistics]
- }
-
- // `ShuffleQueryStageExec` gives null mapOutputStatistics when the input RDD has 0 partitions,
+ // `ShuffleQueryStageExec#mapStats` returns None when the input RDD has 0 partitions,
// we should skip it when calculating the `partitionStartIndices`.
- val validMetrics = shuffleMetrics.filter(_ != null)
+ val validMetrics = shuffleStages.flatMap(_.mapStats)
+
// We may have different pre-shuffle partition numbers, don't reduce shuffle partition number
// in that case. For example when we union fully aggregated data (data is arranged to a single
// partition) and a result of a SortMergeJoin (multiple partitions).
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DemoteBroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DemoteBroadcastHashJoin.scala
index e564299..0f2868e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DemoteBroadcastHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DemoteBroadcastHashJoin.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.execution.adaptive
-import org.apache.spark.MapOutputStatistics
import org.apache.spark.sql.catalyst.plans.logical.{HintInfo, Join, LogicalPlan, NO_BROADCAST_HASH}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.SQLConf
@@ -30,10 +29,10 @@ case class DemoteBroadcastHashJoin(conf: SQLConf) extends Rule[LogicalPlan] {
private def shouldDemote(plan: LogicalPlan): Boolean = plan match {
case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if stage.resultOption.isDefined
- && stage.resultOption.get != null =>
- val mapOutputStatistics = stage.resultOption.get.asInstanceOf[MapOutputStatistics]
- val partitionCnt = mapOutputStatistics.bytesByPartitionId.length
- val nonZeroCnt = mapOutputStatistics.bytesByPartitionId.count(_ > 0)
+ && stage.mapStats.isDefined =>
+ val mapStats = stage.mapStats.get
+ val partitionCnt = mapStats.bytesByPartitionId.length
+ val nonZeroCnt = mapStats.bytesByPartitionId.count(_ > 0)
partitionCnt > 0 && nonZeroCnt > 0 &&
(nonZeroCnt * 1.0 / partitionCnt) < conf.nonEmptyPartitionRatioForBroadcastJoin
case _ => false
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
index b09e563..d94999a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
@@ -286,16 +286,17 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
private object ShuffleStage {
def unapply(plan: SparkPlan): Option[ShuffleStageInfo] = plan match {
- case s: ShuffleQueryStageExec =>
- val mapStats = getMapStats(s)
+ case s: ShuffleQueryStageExec if s.mapStats.isDefined =>
+ val mapStats = s.mapStats.get
val sizes = mapStats.bytesByPartitionId
val partitions = sizes.zipWithIndex.map {
case (size, i) => CoalescedPartitionSpec(i, i + 1) -> size
}
Some(ShuffleStageInfo(s, mapStats, partitions))
- case CustomShuffleReaderExec(s: ShuffleQueryStageExec, partitionSpecs, _) =>
- val mapStats = getMapStats(s)
+ case CustomShuffleReaderExec(s: ShuffleQueryStageExec, partitionSpecs, _)
+ if s.mapStats.isDefined =>
+ val mapStats = s.mapStats.get
val sizes = mapStats.bytesByPartitionId
val partitions = partitionSpecs.map {
case spec @ CoalescedPartitionSpec(start, end) =>
@@ -313,12 +314,6 @@ private object ShuffleStage {
case _ => None
}
-
- private def getMapStats(stage: ShuffleQueryStageExec): MapOutputStatistics = {
- assert(stage.resultOption.isDefined, "ShuffleQueryStageExec should" +
- " already be ready when executing OptimizeSkewedPartitions rule")
- stage.resultOption.get.asInstanceOf[MapOutputStatistics]
- }
}
private case class ShuffleStageInfo(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
index d5dc1be..beaa972 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
@@ -161,6 +161,16 @@ case class ShuffleQueryStageExec(
case _ =>
}
}
+
+ /**
+ * Returns the Option[MapOutputStatistics]. If the shuffle map stage has no partition,
+ * this method returns None, as there is no map statistics.
+ */
+ def mapStats: Option[MapOutputStatistics] = {
+ assert(resultOption.isDefined, "ShuffleQueryStageExec should already be ready")
+ val stats = resultOption.get.asInstanceOf[MapOutputStatistics]
+ Option(stats)
+ }
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index d4c5b0d..bfde042 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -23,7 +23,7 @@ import java.net.URI
import org.apache.log4j.Level
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart}
-import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.execution.{ReusedSubqueryExec, ShuffledRowRDD, SparkPlan}
import org.apache.spark.sql.execution.adaptive.OptimizeLocalShuffleReader.LOCAL_SHUFFLE_READER_DESCRIPTION
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ReusedExchangeExec}
@@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildRight,
import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, StructType}
import org.apache.spark.util.Utils
class AdaptiveQueryExecSuite
@@ -781,4 +782,17 @@ class AdaptiveQueryExecSuite
}
}
}
+
+ test("SPARK-31384: avoid NPE in OptimizeSkewedJoin when there's 0 partition plan") {
+ withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ withTempView("t2") {
+ // create DataFrame with 0 partition
+ spark.createDataFrame(sparkContext.emptyRDD[Row], new StructType().add("b", IntegerType))
+ .createOrReplaceTempView("t2")
+ // should run successfully without NPE
+ runAdaptiveAndVerifyResult("SELECT * FROM testData2 t1 left semi join t2 ON t1.a=t2.b")
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org