You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/04/15 22:45:16 UTC

[spark] branch branch-2.4 updated: [SPARK-27351][SQL] Wrong outputRows estimation after AggregateEstimation wit…

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 40668c5  [SPARK-27351][SQL] Wrong outputRows estimation after AggregateEstimation wit…
40668c5 is described below

commit 40668c53ed799881db1f316ceaf2f978b294d8ed
Author: pengbo <bo...@gmail.com>
AuthorDate: Mon Apr 15 15:37:07 2019 -0700

    [SPARK-27351][SQL] Wrong outputRows estimation after AggregateEstimation wit…
    
    ## What changes were proposed in this pull request?
    The upper bound of group-by columns row number is to multiply distinct counts of group-by columns. However, column with only null value will cause the output row number to be 0 which is incorrect.
    Ex:
    col1 (distinct: 2, rowCount 2)
    col2 (distinct: 0, rowCount 2)
    => group by col1, col2
    Actual: output rows: 0
    Expected: output rows: 2
    
    ## How was this patch tested?
    According unit test has been added, plus manual test has been done in our tpcds benchmark environement.
    
    Closes #24286 from pengbo/master.
    
    Lead-authored-by: pengbo <bo...@gmail.com>
    Co-authored-by: mingbo_pb <mi...@alibaba-inc.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
    (cherry picked from commit c58a4fed8d79aff9fbac9f9a33141b2edbfb0cea)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../plans/logical/statsEstimation/AggregateEstimation.scala  | 12 ++++++++++--
 .../catalyst/statsEstimation/AggregateEstimationSuite.scala  | 12 +++++++++++-
 2 files changed, 21 insertions(+), 3 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala
index 111c594..7ef22fa 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala
@@ -39,8 +39,16 @@ object AggregateEstimation {
       // Multiply distinct counts of group-by columns. This is an upper bound, which assumes
       // the data contains all combinations of distinct values of group-by columns.
       var outputRows: BigInt = agg.groupingExpressions.foldLeft(BigInt(1))(
-        (res, expr) => res *
-          childStats.attributeStats(expr.asInstanceOf[Attribute]).distinctCount.get)
+        (res, expr) => {
+          val columnStat = childStats.attributeStats(expr.asInstanceOf[Attribute])
+          val distinctCount = columnStat.distinctCount.get
+          val distinctValue: BigInt = if (distinctCount == 0 && columnStat.nullCount.get > 0) {
+            1
+          } else {
+            distinctCount
+          }
+          res * distinctValue
+        })
 
       outputRows = if (agg.groupingExpressions.isEmpty) {
         // If there's no group-by columns, the output is a single row containing values of aggregate
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala
index 8213d56..6bdf8cd 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala
@@ -38,7 +38,9 @@ class AggregateEstimationSuite extends StatsEstimationTestBase with PlanTest {
     attr("key22") -> ColumnStat(distinctCount = Some(2), min = Some(10), max = Some(20),
       nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
     attr("key31") -> ColumnStat(distinctCount = Some(0), min = None, max = None,
-      nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
+      nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+    attr("key32") -> ColumnStat(distinctCount = Some(0), min = None, max = None,
+      nullCount = Some(4), avgLen = Some(4), maxLen = Some(4))
   ))
 
   private val nameToAttr: Map[String, Attribute] = columnInfo.map(kv => kv._1.name -> kv._1)
@@ -92,6 +94,14 @@ class AggregateEstimationSuite extends StatsEstimationTestBase with PlanTest {
       expectedOutputRowCount = 0)
   }
 
+  test("group-by column with only null value") {
+    checkAggStats(
+      tableColumns = Seq("key22", "key32"),
+      tableRowCount = 6,
+      groupByColumns = Seq("key22", "key32"),
+      expectedOutputRowCount = nameToColInfo("key22")._2.distinctCount.get)
+  }
+
   test("non-cbo estimation") {
     val attributes = Seq("key12").map(nameToAttr)
     val child = StatsTestPlan(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org