You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/07/19 02:57:19 UTC

spark git commit: [SPARK-21273][SQL][FOLLOW-UP] Propagate logical plan stats using visitor pattern and mixin

Repository: spark
Updated Branches:
  refs/heads/master 81c99a5b9 -> ae253e5a8


[SPARK-21273][SQL][FOLLOW-UP] Propagate logical plan stats using visitor pattern and mixin

## What changes were proposed in this pull request?
This PR is to add back the stats propagation of `Window` and remove the stats calculation of the leaf node `Range`, which has been covered by https://github.com/rxin/spark/blob/9c32d2507d3f4f269e17e841a4a4e4920b35a5e9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala#L56

## How was this patch tested?
Added two test cases.

Author: gatorsmile <ga...@gmail.com>

Closes #18677 from gatorsmile/visitStats.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ae253e5a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ae253e5a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ae253e5a

Branch: refs/heads/master
Commit: ae253e5a878a0adc2785ae050c49022687ac1d06
Parents: 81c99a5
Author: gatorsmile <ga...@gmail.com>
Authored: Wed Jul 19 10:57:15 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Jul 19 10:57:15 2017 +0800

----------------------------------------------------------------------
 .../plans/logical/LogicalPlanVisitor.scala      |  6 +++---
 .../statsEstimation/BasicStatsPlanVisitor.scala |  7 ++-----
 .../SizeInBytesOnlyStatsPlanVisitor.scala       |  6 ++----
 .../BasicStatsEstimationSuite.scala             | 20 ++++++++++++++++++++
 4 files changed, 27 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ae253e5a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanVisitor.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanVisitor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanVisitor.scala
index 2652f6d..e074804 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanVisitor.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanVisitor.scala
@@ -35,13 +35,13 @@ trait LogicalPlanVisitor[T] {
     case p: LocalLimit => visitLocalLimit(p)
     case p: Pivot => visitPivot(p)
     case p: Project => visitProject(p)
-    case p: Range => visitRange(p)
     case p: Repartition => visitRepartition(p)
     case p: RepartitionByExpression => visitRepartitionByExpr(p)
     case p: ResolvedHint => visitHint(p)
     case p: Sample => visitSample(p)
     case p: ScriptTransformation => visitScriptTransform(p)
     case p: Union => visitUnion(p)
+    case p: Window => visitWindow(p)
     case p: LogicalPlan => default(p)
   }
 
@@ -73,8 +73,6 @@ trait LogicalPlanVisitor[T] {
 
   def visitProject(p: Project): T
 
-  def visitRange(p: Range): T
-
   def visitRepartition(p: Repartition): T
 
   def visitRepartitionByExpr(p: RepartitionByExpression): T
@@ -84,4 +82,6 @@ trait LogicalPlanVisitor[T] {
   def visitScriptTransform(p: ScriptTransformation): T
 
   def visitUnion(p: Union): T
+
+  def visitWindow(p: Window): T
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ae253e5a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/BasicStatsPlanVisitor.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/BasicStatsPlanVisitor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/BasicStatsPlanVisitor.scala
index 93908b0..4cff72d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/BasicStatsPlanVisitor.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/BasicStatsPlanVisitor.scala
@@ -65,11 +65,6 @@ object BasicStatsPlanVisitor extends LogicalPlanVisitor[Statistics] {
     ProjectEstimation.estimate(p).getOrElse(fallback(p))
   }
 
-  override def visitRange(p: logical.Range): Statistics = {
-    val sizeInBytes = LongType.defaultSize * p.numElements
-    Statistics(sizeInBytes = sizeInBytes)
-  }
-
   override def visitRepartition(p: Repartition): Statistics = fallback(p)
 
   override def visitRepartitionByExpr(p: RepartitionByExpression): Statistics = fallback(p)
@@ -79,4 +74,6 @@ object BasicStatsPlanVisitor extends LogicalPlanVisitor[Statistics] {
   override def visitScriptTransform(p: ScriptTransformation): Statistics = fallback(p)
 
   override def visitUnion(p: Union): Statistics = fallback(p)
+
+  override def visitWindow(p: Window): Statistics = fallback(p)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ae253e5a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala
index 559f120..d701a95 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala
@@ -136,10 +136,6 @@ object SizeInBytesOnlyStatsPlanVisitor extends LogicalPlanVisitor[Statistics] {
 
   override def visitProject(p: Project): Statistics = visitUnaryNode(p)
 
-  override def visitRange(p: logical.Range): Statistics = {
-    p.computeStats()
-  }
-
   override def visitRepartition(p: Repartition): Statistics = default(p)
 
   override def visitRepartitionByExpr(p: RepartitionByExpression): Statistics = default(p)
@@ -160,4 +156,6 @@ object SizeInBytesOnlyStatsPlanVisitor extends LogicalPlanVisitor[Statistics] {
   override def visitUnion(p: Union): Statistics = {
     Statistics(sizeInBytes = p.children.map(_.stats.sizeInBytes).sum)
   }
+
+  override def visitWindow(p: Window): Statistics = visitUnaryNode(p)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ae253e5a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
index 913be6d..7d532ff 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.catalyst.statsEstimation
 
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Literal}
 import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -54,6 +56,24 @@ class BasicStatsEstimationSuite extends PlanTest with StatsEstimationTestBase {
     )
   }
 
+  test("range") {
+    val range = Range(1, 5, 1, None)
+    val rangeStats = Statistics(sizeInBytes = 4 * 8)
+    checkStats(
+      range,
+      expectedStatsCboOn = rangeStats,
+      expectedStatsCboOff = rangeStats)
+  }
+
+  test("windows") {
+    val windows = plan.window(Seq(min(attribute).as('sum_attr)), Seq(attribute), Nil)
+    val windowsStats = Statistics(sizeInBytes = plan.size.get * (4 + 4 + 8) / (4 + 8))
+    checkStats(
+      windows,
+      expectedStatsCboOn = windowsStats,
+      expectedStatsCboOff = windowsStats)
+  }
+
   test("limit estimation: limit < child's rowCount") {
     val localLimit = LocalLimit(Literal(2), plan)
     val globalLimit = GlobalLimit(Literal(2), plan)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org