You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/09/21 01:30:59 UTC

git commit: [SPARK-3609][SQL] Adds sizeInBytes statistics for Limit operator when all output attributes are of native data types

Repository: spark
Updated Branches:
  refs/heads/master 7c8ad1c08 -> 7f54580c4


[SPARK-3609][SQL] Adds sizeInBytes statistics for Limit operator when all output attributes are of native data types

This helps to replace shuffled hash joins with broadcast hash joins in some cases.

Author: Cheng Lian <li...@gmail.com>

Closes #2468 from liancheng/more-stats and squashes the following commits:

32687dc [Cheng Lian] Moved the test case to PlannerSuite
5595a91 [Cheng Lian] Removes debugging code
73faf69 [Cheng Lian] Test case for auto choosing broadcast hash join
f30fe1d [Cheng Lian] Adds sizeInBytes estimation for Limit when all output types are native types


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7f54580c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7f54580c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7f54580c

Branch: refs/heads/master
Commit: 7f54580c4503d8b6bfcf7d4cbc83b83458140926
Parents: 7c8ad1c
Author: Cheng Lian <li...@gmail.com>
Authored: Sat Sep 20 16:30:49 2014 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Sat Sep 20 16:30:49 2014 -0700

----------------------------------------------------------------------
 .../catalyst/plans/logical/basicOperators.scala | 11 +++++++++++
 .../spark/sql/catalyst/types/dataTypes.scala    | 10 ++++++++++
 .../org/apache/spark/sql/SQLQuerySuite.scala    |  9 +++++----
 .../spark/sql/execution/PlannerSuite.scala      | 20 +++++++++++++++++++-
 4 files changed, 45 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7f54580c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 5d10754..8e8259c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -148,6 +148,17 @@ case class Aggregate(
 
 case class Limit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode {
   override def output = child.output
+
+  override lazy val statistics: Statistics =
+    if (output.forall(_.dataType.isInstanceOf[NativeType])) {
+      val limit = limitExpr.eval(null).asInstanceOf[Int]
+      val sizeInBytes = (limit: Long) * output.map { a =>
+        NativeType.defaultSizeOf(a.dataType.asInstanceOf[NativeType])
+      }.sum
+      Statistics(sizeInBytes = sizeInBytes)
+    } else {
+      Statistics(sizeInBytes = children.map(_.statistics).map(_.sizeInBytes).product)
+    }
 }
 
 case class Subquery(alias: String, child: LogicalPlan) extends UnaryNode {

http://git-wip-us.apache.org/repos/asf/spark/blob/7f54580c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
index 49520b7..e3050e5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
@@ -122,6 +122,16 @@ object NativeType {
     IntegerType, BooleanType, LongType, DoubleType, FloatType, ShortType, ByteType, StringType)
 
   def unapply(dt: DataType): Boolean = all.contains(dt)
+
+  val defaultSizeOf: Map[NativeType, Int] = Map(
+    IntegerType -> 4,
+    BooleanType -> 1,
+    LongType -> 8,
+    DoubleType -> 8,
+    FloatType -> 4,
+    ShortType -> 2,
+    ByteType -> 1,
+    StringType -> 4096)
 }
 
 trait PrimitiveType extends DataType {

http://git-wip-us.apache.org/repos/asf/spark/blob/7f54580c/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 67563b6..15f6bce 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql
 
 import org.apache.spark.sql.catalyst.errors.TreeNodeException
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.{ShuffledHashJoin, BroadcastHashJoin}
 import org.apache.spark.sql.test._
 import org.scalatest.BeforeAndAfterAll
 import java.util.TimeZone
@@ -649,24 +650,24 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
       (3, null) ::
       (4, 2147483644) :: Nil)
   }
-  
+
   test("SPARK-3423 BETWEEN") {
     checkAnswer(
       sql("SELECT key, value FROM testData WHERE key BETWEEN 5 and 7"),
       Seq((5, "5"), (6, "6"), (7, "7"))
     )
-    
+
     checkAnswer(
       sql("SELECT key, value FROM testData WHERE key BETWEEN 7 and 7"),
       Seq((7, "7"))
     )
-    
+
     checkAnswer(
       sql("SELECT key, value FROM testData WHERE key BETWEEN 9 and 7"),
       Seq()
     )
   }
-    
+
   test("cast boolean to string") {
     // TODO Ensure true/false string letter casing is consistent with Hive in all cases.
     checkAnswer(

http://git-wip-us.apache.org/repos/asf/spark/blob/7f54580c/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index 37d64f0..bfbf431 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -22,7 +22,7 @@ import org.scalatest.FunSuite
 import org.apache.spark.sql.TestData._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.execution
+import org.apache.spark.sql.{SQLConf, execution}
 import org.apache.spark.sql.test.TestSQLContext._
 import org.apache.spark.sql.test.TestSQLContext.planner._
 
@@ -57,4 +57,22 @@ class PlannerSuite extends FunSuite {
     val planned = HashAggregation(query)
     assert(planned.nonEmpty)
   }
+
+  test("sizeInBytes estimation of limit operator for broadcast hash join optimization") {
+    val origThreshold = autoBroadcastJoinThreshold
+    setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, 81920.toString)
+
+    // Using a threshold that is definitely larger than the small testing table (b) below
+    val a = testData.as('a)
+    val b = testData.limit(3).as('b)
+    val planned = a.join(b, Inner, Some("a.key".attr === "b.key".attr)).queryExecution.executedPlan
+
+    val broadcastHashJoins = planned.collect { case join: BroadcastHashJoin => join }
+    val shuffledHashJoins = planned.collect { case join: ShuffledHashJoin => join }
+
+    assert(broadcastHashJoins.size === 1, "Should use broadcast hash join")
+    assert(shuffledHashJoins.isEmpty, "Should not use shuffled hash join")
+
+    setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, origThreshold.toString)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org