You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/01/23 22:56:31 UTC

spark git commit: [SPARK-23192][SQL] Keep the Hint after Using Cached Data

Repository: spark
Updated Branches:
  refs/heads/master 05839d164 -> 613c29033


[SPARK-23192][SQL] Keep the Hint after Using Cached Data

## What changes were proposed in this pull request?

The hint of the plan segment is lost, if the plan segment is replaced by the cached data.

```Scala
      val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value")
      val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", "value")
      df2.cache()
      val df3 = df1.join(broadcast(df2), Seq("key"), "inner")
```

This PR is to fix it.

## How was this patch tested?
Added a test

Author: gatorsmile <ga...@gmail.com>

Closes #20365 from gatorsmile/fixBroadcastHintloss.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/613c2903
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/613c2903
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/613c2903

Branch: refs/heads/master
Commit: 613c290336e3826111164c24319f66774b1f65a3
Parents: 05839d1
Author: gatorsmile <ga...@gmail.com>
Authored: Tue Jan 23 14:56:28 2018 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Tue Jan 23 14:56:28 2018 -0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/execution/CacheManager.scala  | 12 ++++++++----
 .../spark/sql/execution/joins/BroadcastJoinSuite.scala | 13 +++++++++++++
 2 files changed, 21 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/613c2903/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index b05fe49..432eb59 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ResolvedHint}
 import org.apache.spark.sql.execution.columnar.InMemoryRelation
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
 import org.apache.spark.storage.StorageLevel
@@ -170,9 +170,13 @@ class CacheManager extends Logging {
   def useCachedData(plan: LogicalPlan): LogicalPlan = {
     val newPlan = plan transformDown {
       case currentFragment =>
-        lookupCachedData(currentFragment)
-          .map(_.cachedRepresentation.withOutput(currentFragment.output))
-          .getOrElse(currentFragment)
+        lookupCachedData(currentFragment).map { cached =>
+          val cachedPlan = cached.cachedRepresentation.withOutput(currentFragment.output)
+          currentFragment match {
+            case hint: ResolvedHint => ResolvedHint(cachedPlan, hint.hints)
+            case _ => cachedPlan
+          }
+        }.getOrElse(currentFragment)
     }
 
     newPlan transformAllExpressions {

http://git-wip-us.apache.org/repos/asf/spark/blob/613c2903/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
index 0bcd54e..1704bc8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
@@ -109,6 +109,19 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils {
     }
   }
 
+  test("broadcast hint is retained after using the cached data") {
+    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+      val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value")
+      val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", "value")
+      df2.cache()
+      val df3 = df1.join(broadcast(df2), Seq("key"), "inner")
+      val numBroadCastHashJoin = df3.queryExecution.executedPlan.collect {
+        case b: BroadcastHashJoinExec => b
+      }.size
+      assert(numBroadCastHashJoin === 1)
+    }
+  }
+
   test("broadcast hint isn't propagated after a join") {
     withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
       val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org