You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/01/24 00:17:13 UTC
spark git commit: [SPARK-23195][SQL] Keep the Hint of Cached Data
Repository: spark
Updated Branches:
refs/heads/master 613c29033 -> 44cc4daf3
[SPARK-23195][SQL] Keep the Hint of Cached Data
## What changes were proposed in this pull request?
The broadcast hint of the cached plan is lost if we cache the plan. This PR is to correct it.
```Scala
val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value")
val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", "value")
broadcast(df2).cache()
df2.collect()
val df3 = df1.join(df2, Seq("key"), "inner")
```
## How was this patch tested?
Added a test.
Author: gatorsmile <ga...@gmail.com>
Closes #20368 from gatorsmile/cachedBroadcastHint.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/44cc4daf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/44cc4daf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/44cc4daf
Branch: refs/heads/master
Commit: 44cc4daf3a03f1a220eef8ce3c86867745db9ab7
Parents: 613c290
Author: gatorsmile <ga...@gmail.com>
Authored: Tue Jan 23 16:17:09 2018 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Tue Jan 23 16:17:09 2018 -0800
----------------------------------------------------------------------
.../sql/execution/columnar/InMemoryRelation.scala | 4 ++--
.../sql/execution/joins/BroadcastJoinSuite.scala | 16 ++++++++++++++++
2 files changed, 18 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/44cc4daf/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index 51928d9..5945808 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -63,7 +63,7 @@ case class InMemoryRelation(
tableName: Option[String])(
@transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator,
- statsOfPlanToCache: Statistics = null)
+ statsOfPlanToCache: Statistics)
extends logical.LeafNode with MultiInstanceRelation {
override protected def innerChildren: Seq[SparkPlan] = Seq(child)
@@ -77,7 +77,7 @@ case class InMemoryRelation(
// Underlying columnar RDD hasn't been materialized, use the stats from the plan to cache
statsOfPlanToCache
} else {
- Statistics(sizeInBytes = batchStats.value.longValue)
+ Statistics(sizeInBytes = batchStats.value.longValue, hints = statsOfPlanToCache.hints)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/44cc4daf/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
index 1704bc8..889cab0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
@@ -139,6 +139,22 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils {
}
}
+ test("broadcast hint is retained in a cached plan") {
+ Seq(true, false).foreach { materialized =>
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value")
+ val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", "value")
+ broadcast(df2).cache()
+ if (materialized) df2.collect()
+ val df3 = df1.join(df2, Seq("key"), "inner")
+ val numBroadCastHashJoin = df3.queryExecution.executedPlan.collect {
+ case b: BroadcastHashJoinExec => b
+ }.size
+ assert(numBroadCastHashJoin === 1)
+ }
+ }
+ }
+
private def assertBroadcastJoin(df : Dataset[Row]) : Unit = {
val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value")
val joined = df1.join(df, Seq("key"), "inner")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org