You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/04/10 08:36:59 UTC

[spark] branch branch-3.0 updated: [SPARK-31384][SQL] Fix NPE in OptimizeSkewedJoin

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 6c9827f  [SPARK-31384][SQL] Fix NPE in OptimizeSkewedJoin
6c9827f is described below

commit 6c9827f5c20ba2d477eab28288039abb2baf9324
Author: yi.wu <yi...@databricks.com>
AuthorDate: Fri Apr 10 08:16:48 2020 +0000

    [SPARK-31384][SQL] Fix NPE in OptimizeSkewedJoin
    
    1. Fix NPE in `OptimizeSkewedJoin`
    
    2. prevent other potential NPE errors in AQE.
    
    When there's a `inputRDD` of a plan has 0 partition, rule `OptimizeSkewedJoin` can hit NPE error because this kind of RDD means a null `MapOutputStatistics` due to:
    
    https://github.com/apache/spark/blob/d98df7626b8a88cb9a1fee4f454b19333a9f3ced/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L68-L69
    
    Thus, we should take care of such NPE errors in other places too.
    
    No
    
    Added a test.
    
    Closes #28153 from Ngone51/npe.
    
    Authored-by: yi.wu <yi...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 6cddda7847ceabd257008986ede7ec844f1f3607)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../execution/adaptive/CoalesceShufflePartitions.scala   | 11 +++--------
 .../sql/execution/adaptive/DemoteBroadcastHashJoin.scala |  9 ++++-----
 .../sql/execution/adaptive/OptimizeSkewedJoin.scala      | 15 +++++----------
 .../spark/sql/execution/adaptive/QueryStageExec.scala    | 10 ++++++++++
 .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala  | 16 +++++++++++++++-
 5 files changed, 37 insertions(+), 24 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
index 226d692..096d65f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.execution.adaptive
 
-import org.apache.spark.MapOutputStatistics
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.SparkPlan
@@ -54,14 +53,10 @@ case class CoalesceShufflePartitions(session: SparkSession) extends Rule[SparkPl
     if (!shuffleStages.forall(_.shuffle.canChangeNumPartitions)) {
       plan
     } else {
-      val shuffleMetrics = shuffleStages.map { stage =>
-        assert(stage.resultOption.isDefined, "ShuffleQueryStageExec should already be ready")
-        stage.resultOption.get.asInstanceOf[MapOutputStatistics]
-      }
-
-      // `ShuffleQueryStageExec` gives null mapOutputStatistics when the input RDD has 0 partitions,
+      // `ShuffleQueryStageExec#mapStats` returns None when the input RDD has 0 partitions,
       // we should skip it when calculating the `partitionStartIndices`.
-      val validMetrics = shuffleMetrics.filter(_ != null)
+      val validMetrics = shuffleStages.flatMap(_.mapStats)
+
       // We may have different pre-shuffle partition numbers, don't reduce shuffle partition number
       // in that case. For example when we union fully aggregated data (data is arranged to a single
       // partition) and a result of a SortMergeJoin (multiple partitions).
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DemoteBroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DemoteBroadcastHashJoin.scala
index e564299..0f2868e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DemoteBroadcastHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DemoteBroadcastHashJoin.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.execution.adaptive
 
-import org.apache.spark.MapOutputStatistics
 import org.apache.spark.sql.catalyst.plans.logical.{HintInfo, Join, LogicalPlan, NO_BROADCAST_HASH}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
@@ -30,10 +29,10 @@ case class DemoteBroadcastHashJoin(conf: SQLConf) extends Rule[LogicalPlan] {
 
   private def shouldDemote(plan: LogicalPlan): Boolean = plan match {
     case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if stage.resultOption.isDefined
-      && stage.resultOption.get != null =>
-      val mapOutputStatistics = stage.resultOption.get.asInstanceOf[MapOutputStatistics]
-      val partitionCnt = mapOutputStatistics.bytesByPartitionId.length
-      val nonZeroCnt = mapOutputStatistics.bytesByPartitionId.count(_ > 0)
+      && stage.mapStats.isDefined =>
+      val mapStats = stage.mapStats.get
+      val partitionCnt = mapStats.bytesByPartitionId.length
+      val nonZeroCnt = mapStats.bytesByPartitionId.count(_ > 0)
       partitionCnt > 0 && nonZeroCnt > 0 &&
         (nonZeroCnt * 1.0 / partitionCnt) < conf.nonEmptyPartitionRatioForBroadcastJoin
     case _ => false
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
index b09e563..d94999a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
@@ -286,16 +286,17 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
 
 private object ShuffleStage {
   def unapply(plan: SparkPlan): Option[ShuffleStageInfo] = plan match {
-    case s: ShuffleQueryStageExec =>
-      val mapStats = getMapStats(s)
+    case s: ShuffleQueryStageExec if s.mapStats.isDefined =>
+      val mapStats = s.mapStats.get
       val sizes = mapStats.bytesByPartitionId
       val partitions = sizes.zipWithIndex.map {
         case (size, i) => CoalescedPartitionSpec(i, i + 1) -> size
       }
       Some(ShuffleStageInfo(s, mapStats, partitions))
 
-    case CustomShuffleReaderExec(s: ShuffleQueryStageExec, partitionSpecs, _) =>
-      val mapStats = getMapStats(s)
+    case CustomShuffleReaderExec(s: ShuffleQueryStageExec, partitionSpecs, _)
+      if s.mapStats.isDefined =>
+      val mapStats = s.mapStats.get
       val sizes = mapStats.bytesByPartitionId
       val partitions = partitionSpecs.map {
         case spec @ CoalescedPartitionSpec(start, end) =>
@@ -313,12 +314,6 @@ private object ShuffleStage {
 
     case _ => None
   }
-
-  private def getMapStats(stage: ShuffleQueryStageExec): MapOutputStatistics = {
-    assert(stage.resultOption.isDefined, "ShuffleQueryStageExec should" +
-      " already be ready when executing OptimizeSkewedPartitions rule")
-    stage.resultOption.get.asInstanceOf[MapOutputStatistics]
-  }
 }
 
 private case class ShuffleStageInfo(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
index d5dc1be..beaa972 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
@@ -161,6 +161,16 @@ case class ShuffleQueryStageExec(
       case _ =>
     }
   }
+
+  /**
+   * Returns the Option[MapOutputStatistics]. If the shuffle map stage has no partition,
+   * this method returns None, as there is no map statistics.
+   */
+  def mapStats: Option[MapOutputStatistics] = {
+    assert(resultOption.isDefined, "ShuffleQueryStageExec should already be ready")
+    val stats = resultOption.get.asInstanceOf[MapOutputStatistics]
+    Option(stats)
+  }
 }
 
 /**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index d4c5b0d..bfde042 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -23,7 +23,7 @@ import java.net.URI
 import org.apache.log4j.Level
 
 import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart}
-import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.{QueryTest, Row}
 import org.apache.spark.sql.execution.{ReusedSubqueryExec, ShuffledRowRDD, SparkPlan}
 import org.apache.spark.sql.execution.adaptive.OptimizeLocalShuffleReader.LOCAL_SHUFFLE_READER_DESCRIPTION
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ReusedExchangeExec}
@@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildRight,
 import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, StructType}
 import org.apache.spark.util.Utils
 
 class AdaptiveQueryExecSuite
@@ -781,4 +782,17 @@ class AdaptiveQueryExecSuite
       }
     }
   }
+
+  test("SPARK-31384: avoid NPE in OptimizeSkewedJoin when there's 0 partition plan") {
+    withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+      withTempView("t2") {
+        // create DataFrame with 0 partition
+        spark.createDataFrame(sparkContext.emptyRDD[Row], new StructType().add("b", IntegerType))
+          .createOrReplaceTempView("t2")
+        // should run successfully without NPE
+        runAdaptiveAndVerifyResult("SELECT * FROM testData2 t1 left semi join t2 ON t1.a=t2.b")
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org