You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/09/29 16:52:58 UTC

[spark] branch branch-3.0 updated: [SPARK-33018][SQL] Fix estimate statistics issue if child has 0 bytes

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new ae8b35a  [SPARK-33018][SQL] Fix estimate statistics issue if child has 0 bytes
ae8b35a is described below

commit ae8b35a0d24f8c83bbbb597d668875793a8dbca6
Author: Yuming Wang <yu...@ebay.com>
AuthorDate: Tue Sep 29 16:46:04 2020 +0000

    [SPARK-33018][SQL] Fix estimate statistics issue if child has 0 bytes
    
    ### What changes were proposed in this pull request?
    
    This pr fix estimate statistics issue if child has 0 bytes.
    
    ### Why are the changes needed?
    The `sizeInBytes` can be `0` when AQE and CBO are enabled(`spark.sql.adaptive.enabled`=true, `spark.sql.cbo.enabled`=true and `spark.sql.cbo.planStats.enabled`=true). This will generate incorrect BroadcastJoin, resulting in Driver OOM. For example:
    ![SPARK-33018](https://user-images.githubusercontent.com/5399861/94457606-647e3d00-01e7-11eb-85ee-812ae6efe7bb.jpg)
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Manual test.
    
    Closes #29894 from wangyum/SPARK-33018.
    
    Authored-by: Yuming Wang <yu...@ebay.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 711d8dd28afd9af92b025f9908534e5f1d575042)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../SizeInBytesOnlyStatsPlanVisitor.scala          |  3 ++-
 .../statsEstimation/JoinEstimationSuite.scala      | 22 ++++++++++++++++++++++
 .../statsEstimation/StatsEstimationTestBase.scala  |  9 ++++++---
 3 files changed, 30 insertions(+), 4 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala
index da36db7..a586988 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala
@@ -53,7 +53,8 @@ object SizeInBytesOnlyStatsPlanVisitor extends LogicalPlanVisitor[Statistics] {
    */
   override def default(p: LogicalPlan): Statistics = p match {
     case p: LeafNode => p.computeStats()
-    case _: LogicalPlan => Statistics(sizeInBytes = p.children.map(_.stats.sizeInBytes).product)
+    case _: LogicalPlan =>
+      Statistics(sizeInBytes = p.children.map(_.stats.sizeInBytes).filter(_ > 0L).product)
   }
 
   override def visitAggregate(p: Aggregate): Statistics = {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala
index 6c5a2b2..cdfc863 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala
@@ -551,4 +551,26 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
       attributeStats = AttributeMap(Nil))
     assert(join.stats == expectedStats)
   }
+
+  test("SPARK-33018 Fix estimate statistics issue if child has 0 bytes") {
+    case class MyStatsTestPlan(
+        outputList: Seq[Attribute],
+        sizeInBytes: BigInt) extends LeafNode {
+      override def output: Seq[Attribute] = outputList
+      override def computeStats(): Statistics = Statistics(sizeInBytes = sizeInBytes)
+    }
+
+    val left = MyStatsTestPlan(
+      outputList = Seq("key-1-2", "key-2-4").map(nameToAttr),
+      sizeInBytes = BigInt(100))
+
+    val right = MyStatsTestPlan(
+      outputList = Seq("key-1-2", "key-2-3").map(nameToAttr),
+      sizeInBytes = BigInt(0))
+
+    val join = Join(left, right, LeftOuter,
+      Some(EqualTo(nameToAttr("key-2-4"), nameToAttr("key-2-3"))), JoinHint.NONE)
+
+    assert(join.stats == Statistics(sizeInBytes = 100))
+  }
 }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala
index 9dceca5..0a27e31 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala
@@ -26,17 +26,20 @@ import org.apache.spark.sql.types.{IntegerType, StringType}
 
 trait StatsEstimationTestBase extends SparkFunSuite {
 
-  var originalValue: Boolean = false
+  var originalCBOValue: Boolean = false
+  var originalPlanStatsValue: Boolean = false
 
   override def beforeAll(): Unit = {
     super.beforeAll()
     // Enable stats estimation based on CBO.
-    originalValue = SQLConf.get.getConf(SQLConf.CBO_ENABLED)
+    originalCBOValue = SQLConf.get.getConf(SQLConf.CBO_ENABLED)
+    originalPlanStatsValue = SQLConf.get.getConf(SQLConf.PLAN_STATS_ENABLED)
     SQLConf.get.setConf(SQLConf.CBO_ENABLED, true)
   }
 
   override def afterAll(): Unit = {
-    SQLConf.get.setConf(SQLConf.CBO_ENABLED, originalValue)
+    SQLConf.get.setConf(SQLConf.CBO_ENABLED, originalCBOValue)
+    SQLConf.get.setConf(SQLConf.PLAN_STATS_ENABLED, originalPlanStatsValue)
     super.afterAll()
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org