You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2019/05/15 22:48:25 UTC

[spark] branch master updated: [SPARK-27674][SQL] the hint should not be dropped after cache lookup

This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e30a98  [SPARK-27674][SQL] the hint should not be dropped after cache lookup
3e30a98 is described below

commit 3e30a988102e162f2702ae223312763a0bdc15eb
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Wed May 15 15:47:52 2019 -0700

    [SPARK-27674][SQL] the hint should not be dropped after cache lookup
    
    ## What changes were proposed in this pull request?
    
    This is a followup of https://github.com/apache/spark/pull/20365 .
    
    #20365 fixed this problem when the hint node is a root node. This PR fixes this problem for all the cases.
    
    ## How was this patch tested?
    
    a new test
    
    Closes #24580 from cloud-fan/bug.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: gatorsmile <ga...@gmail.com>
---
 .../catalyst/optimizer/EliminateResolvedHint.scala |  2 +-
 .../apache/spark/sql/execution/CacheManager.scala  | 22 +++++----
 .../org/apache/spark/sql/CachedTableSuite.scala    | 56 ++++++++++++++++------
 3 files changed, 54 insertions(+), 26 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/EliminateResolvedHint.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/EliminateResolvedHint.scala
index 5586690..aebd660 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/EliminateResolvedHint.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/EliminateResolvedHint.scala
@@ -56,7 +56,7 @@ object EliminateResolvedHint extends Rule[LogicalPlan] {
    * in this method will be cleaned up later by this rule, and may emit warnings depending on the
    * configurations.
    */
-  private def extractHintsFromPlan(plan: LogicalPlan): (LogicalPlan, Seq[HintInfo]) = {
+  private[sql] def extractHintsFromPlan(plan: LogicalPlan): (LogicalPlan, Seq[HintInfo]) = {
     plan match {
       case h: ResolvedHint =>
         val (plan, hints) = extractHintsFromPlan(h.child)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index b3c253b..a13e6ad 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -23,7 +23,8 @@ import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{Dataset, SparkSession}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, SubqueryExpression}
+import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint
 import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan, ResolvedHint}
 import org.apache.spark.sql.execution.columnar.InMemoryRelation
 import org.apache.spark.sql.execution.command.CommandUtils
@@ -212,17 +213,18 @@ class CacheManager extends Logging {
   def useCachedData(plan: LogicalPlan): LogicalPlan = {
     val newPlan = plan transformDown {
       case command: IgnoreCachedData => command
-      // Do not lookup the cache by hint node. Hint node is special, we should ignore it when
-      // canonicalizing plans, so that plans which are same except hint can hit the same cache.
-      // However, we also want to keep the hint info after cache lookup. Here we skip the hint
-      // node, so that the returned caching plan won't replace the hint node and drop the hint info
-      // from the original plan.
-      case hint: ResolvedHint => hint
 
       case currentFragment =>
-        lookupCachedData(currentFragment)
-          .map(_.cachedRepresentation.withOutput(currentFragment.output))
-          .getOrElse(currentFragment)
+        lookupCachedData(currentFragment).map { cached =>
+          // After cache lookup, we should still keep the hints from the input plan.
+          val hints = EliminateResolvedHint.extractHintsFromPlan(currentFragment)._2
+          val cachedPlan = cached.cachedRepresentation.withOutput(currentFragment.output)
+          // The returned hint list is in top-down order, we should create the hint nodes from
+          // right to left.
+          hints.foldRight[LogicalPlan](cachedPlan) { case (hint, p) =>
+            ResolvedHint(p, hint)
+          }
+        }.getOrElse(currentFragment)
     }
 
     newPlan transformAllExpressions {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 76350ad..62e77bf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.executor.DataReadMethod.DataReadMethod
 import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
-import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, Join}
+import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, Join, JoinStrategyHint, SHUFFLE_HASH}
 import org.apache.spark.sql.execution.{RDDScanExec, SparkPlan}
 import org.apache.spark.sql.execution.columnar._
 import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
@@ -938,23 +938,49 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
     }
   }
 
-  test("Cache should respect the broadcast hint") {
-    val df = broadcast(spark.range(1000)).cache()
-    val df2 = spark.range(1000).cache()
-    df.count()
-    df2.count()
+  test("Cache should respect the hint") {
+    def testHint(df: Dataset[_], expectedHint: JoinStrategyHint): Unit = {
+      val df2 = spark.range(2000).cache()
+      df2.count()
 
-    // Test the broadcast hint.
-    val joinPlan = df.join(df2, "id").queryExecution.optimizedPlan
-    val hint = joinPlan.collect {
-      case Join(_, _, _, _, hint) => hint
+      def checkHintExists(): Unit = {
+        // Test the broadcast hint.
+        val joinPlan = df.join(df2, "id").queryExecution.optimizedPlan
+        val joinHints = joinPlan.collect {
+          case Join(_, _, _, _, hint) => hint
+        }
+        assert(joinHints.size == 1)
+        assert(joinHints(0).leftHint.get.strategy.contains(expectedHint))
+        assert(joinHints(0).rightHint.isEmpty)
+      }
+
+      // Make sure the hint does exist when `df` is not cached.
+      checkHintExists()
+
+      df.cache()
+      try {
+        df.count()
+        // Make sure the hint still exists when `df` is cached.
+        checkHintExists()
+      } finally {
+        // Clean-up
+        df.unpersist()
+      }
     }
-    assert(hint.size == 1)
-    assert(hint(0).leftHint.get.strategy.contains(BROADCAST))
-    assert(hint(0).rightHint.isEmpty)
 
-    // Clean-up
-    df.unpersist()
+    // The hint is the root node
+    testHint(broadcast(spark.range(1000)), BROADCAST)
+    // The hint is under subquery alias
+    testHint(broadcast(spark.range(1000)).as("df"), BROADCAST)
+    // The hint is under filter
+    testHint(broadcast(spark.range(1000)).filter($"id" > 100), BROADCAST)
+    // If there are 2 adjacent hints, the top one takes effect.
+    testHint(
+      spark.range(1000)
+        .hint("SHUFFLE_MERGE")
+        .hint("SHUFFLE_HASH")
+        .as("df"),
+      SHUFFLE_HASH)
   }
 
   test("analyzes column statistics in cached query") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org