You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/01/24 00:17:13 UTC

spark git commit: [SPARK-23195][SQL] Keep the Hint of Cached Data

Repository: spark
Updated Branches:
  refs/heads/master 613c29033 -> 44cc4daf3


[SPARK-23195][SQL] Keep the Hint of Cached Data

## What changes were proposed in this pull request?
The broadcast hint of the cached plan is lost if we cache the plan. This PR is to correct it.

```Scala
  val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value")
  val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", "value")
  broadcast(df2).cache()
  df2.collect()
  val df3 = df1.join(df2, Seq("key"), "inner")
```

## How was this patch tested?
Added a test.

Author: gatorsmile <ga...@gmail.com>

Closes #20368 from gatorsmile/cachedBroadcastHint.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/44cc4daf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/44cc4daf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/44cc4daf

Branch: refs/heads/master
Commit: 44cc4daf3a03f1a220eef8ce3c86867745db9ab7
Parents: 613c290
Author: gatorsmile <ga...@gmail.com>
Authored: Tue Jan 23 16:17:09 2018 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Tue Jan 23 16:17:09 2018 -0800

----------------------------------------------------------------------
 .../sql/execution/columnar/InMemoryRelation.scala   |  4 ++--
 .../sql/execution/joins/BroadcastJoinSuite.scala    | 16 ++++++++++++++++
 2 files changed, 18 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/44cc4daf/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index 51928d9..5945808 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -63,7 +63,7 @@ case class InMemoryRelation(
     tableName: Option[String])(
     @transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
     val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator,
-    statsOfPlanToCache: Statistics = null)
+    statsOfPlanToCache: Statistics)
   extends logical.LeafNode with MultiInstanceRelation {
 
   override protected def innerChildren: Seq[SparkPlan] = Seq(child)
@@ -77,7 +77,7 @@ case class InMemoryRelation(
       // Underlying columnar RDD hasn't been materialized, use the stats from the plan to cache
       statsOfPlanToCache
     } else {
-      Statistics(sizeInBytes = batchStats.value.longValue)
+      Statistics(sizeInBytes = batchStats.value.longValue, hints = statsOfPlanToCache.hints)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/44cc4daf/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
index 1704bc8..889cab0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
@@ -139,6 +139,22 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils {
     }
   }
 
+  test("broadcast hint is retained in a cached plan") {
+    Seq(true, false).foreach { materialized =>
+      withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+        val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value")
+        val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", "value")
+        broadcast(df2).cache()
+        if (materialized) df2.collect()
+        val df3 = df1.join(df2, Seq("key"), "inner")
+        val numBroadCastHashJoin = df3.queryExecution.executedPlan.collect {
+          case b: BroadcastHashJoinExec => b
+        }.size
+        assert(numBroadCastHashJoin === 1)
+      }
+    }
+  }
+
   private def assertBroadcastJoin(df : Dataset[Row]) : Unit = {
     val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value")
     val joined = df1.join(df, Seq("key"), "inner")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org