You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/01/23 22:56:31 UTC
spark git commit: [SPARK-23192][SQL] Keep the Hint after Using Cached
Data
Repository: spark
Updated Branches:
refs/heads/master 05839d164 -> 613c29033
[SPARK-23192][SQL] Keep the Hint after Using Cached Data
## What changes were proposed in this pull request?
The hint of the plan segment is lost, if the plan segment is replaced by the cached data.
```Scala
val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value")
val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", "value")
df2.cache()
val df3 = df1.join(broadcast(df2), Seq("key"), "inner")
```
This PR is to fix it.
## How was this patch tested?
Added a test
Author: gatorsmile <ga...@gmail.com>
Closes #20365 from gatorsmile/fixBroadcastHintloss.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/613c2903
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/613c2903
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/613c2903
Branch: refs/heads/master
Commit: 613c290336e3826111164c24319f66774b1f65a3
Parents: 05839d1
Author: gatorsmile <ga...@gmail.com>
Authored: Tue Jan 23 14:56:28 2018 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Tue Jan 23 14:56:28 2018 -0800
----------------------------------------------------------------------
.../org/apache/spark/sql/execution/CacheManager.scala | 12 ++++++++----
.../spark/sql/execution/joins/BroadcastJoinSuite.scala | 13 +++++++++++++
2 files changed, 21 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/613c2903/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index b05fe49..432eb59 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ResolvedHint}
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.storage.StorageLevel
@@ -170,9 +170,13 @@ class CacheManager extends Logging {
def useCachedData(plan: LogicalPlan): LogicalPlan = {
val newPlan = plan transformDown {
case currentFragment =>
- lookupCachedData(currentFragment)
- .map(_.cachedRepresentation.withOutput(currentFragment.output))
- .getOrElse(currentFragment)
+ lookupCachedData(currentFragment).map { cached =>
+ val cachedPlan = cached.cachedRepresentation.withOutput(currentFragment.output)
+ currentFragment match {
+ case hint: ResolvedHint => ResolvedHint(cachedPlan, hint.hints)
+ case _ => cachedPlan
+ }
+ }.getOrElse(currentFragment)
}
newPlan transformAllExpressions {
http://git-wip-us.apache.org/repos/asf/spark/blob/613c2903/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
index 0bcd54e..1704bc8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
@@ -109,6 +109,19 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils {
}
}
+ test("broadcast hint is retained after using the cached data") {
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value")
+ val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", "value")
+ df2.cache()
+ val df3 = df1.join(broadcast(df2), Seq("key"), "inner")
+ val numBroadCastHashJoin = df3.queryExecution.executedPlan.collect {
+ case b: BroadcastHashJoinExec => b
+ }.size
+ assert(numBroadCastHashJoin === 1)
+ }
+ }
+
test("broadcast hint isn't propagated after a join") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org