You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2019/05/15 22:48:25 UTC
[spark] branch master updated: [SPARK-27674][SQL] the hint should
not be dropped after cache lookup
This is an automated email from the ASF dual-hosted git repository.
lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 3e30a98 [SPARK-27674][SQL] the hint should not be dropped after cache lookup
3e30a98 is described below
commit 3e30a988102e162f2702ae223312763a0bdc15eb
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Wed May 15 15:47:52 2019 -0700
[SPARK-27674][SQL] the hint should not be dropped after cache lookup
## What changes were proposed in this pull request?
This is a followup of https://github.com/apache/spark/pull/20365 .
#20365 fixed this problem when the hint node is a root node. This PR fixes this problem for all the cases.
## How was this patch tested?
a new test
Closes #24580 from cloud-fan/bug.
Authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: gatorsmile <ga...@gmail.com>
---
.../catalyst/optimizer/EliminateResolvedHint.scala | 2 +-
.../apache/spark/sql/execution/CacheManager.scala | 22 +++++----
.../org/apache/spark/sql/CachedTableSuite.scala | 56 ++++++++++++++++------
3 files changed, 54 insertions(+), 26 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/EliminateResolvedHint.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/EliminateResolvedHint.scala
index 5586690..aebd660 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/EliminateResolvedHint.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/EliminateResolvedHint.scala
@@ -56,7 +56,7 @@ object EliminateResolvedHint extends Rule[LogicalPlan] {
* in this method will be cleaned up later by this rule, and may emit warnings depending on the
* configurations.
*/
- private def extractHintsFromPlan(plan: LogicalPlan): (LogicalPlan, Seq[HintInfo]) = {
+ private[sql] def extractHintsFromPlan(plan: LogicalPlan): (LogicalPlan, Seq[HintInfo]) = {
plan match {
case h: ResolvedHint =>
val (plan, hints) = extractHintsFromPlan(h.child)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index b3c253b..a13e6ad 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -23,7 +23,8 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Dataset, SparkSession}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, SubqueryExpression}
+import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint
import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan, ResolvedHint}
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.command.CommandUtils
@@ -212,17 +213,18 @@ class CacheManager extends Logging {
def useCachedData(plan: LogicalPlan): LogicalPlan = {
val newPlan = plan transformDown {
case command: IgnoreCachedData => command
- // Do not lookup the cache by hint node. Hint node is special, we should ignore it when
- // canonicalizing plans, so that plans which are same except hint can hit the same cache.
- // However, we also want to keep the hint info after cache lookup. Here we skip the hint
- // node, so that the returned caching plan won't replace the hint node and drop the hint info
- // from the original plan.
- case hint: ResolvedHint => hint
case currentFragment =>
- lookupCachedData(currentFragment)
- .map(_.cachedRepresentation.withOutput(currentFragment.output))
- .getOrElse(currentFragment)
+ lookupCachedData(currentFragment).map { cached =>
+ // After cache lookup, we should still keep the hints from the input plan.
+ val hints = EliminateResolvedHint.extractHintsFromPlan(currentFragment)._2
+ val cachedPlan = cached.cachedRepresentation.withOutput(currentFragment.output)
+ // The returned hint list is in top-down order, we should create the hint nodes from
+ // right to left.
+ hints.foldRight[LogicalPlan](cachedPlan) { case (hint, p) =>
+ ResolvedHint(p, hint)
+ }
+ }.getOrElse(currentFragment)
}
newPlan transformAllExpressions {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 76350ad..62e77bf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.executor.DataReadMethod.DataReadMethod
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
-import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, Join}
+import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, Join, JoinStrategyHint, SHUFFLE_HASH}
import org.apache.spark.sql.execution.{RDDScanExec, SparkPlan}
import org.apache.spark.sql.execution.columnar._
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
@@ -938,23 +938,49 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
}
}
- test("Cache should respect the broadcast hint") {
- val df = broadcast(spark.range(1000)).cache()
- val df2 = spark.range(1000).cache()
- df.count()
- df2.count()
+ test("Cache should respect the hint") {
+ def testHint(df: Dataset[_], expectedHint: JoinStrategyHint): Unit = {
+ val df2 = spark.range(2000).cache()
+ df2.count()
- // Test the broadcast hint.
- val joinPlan = df.join(df2, "id").queryExecution.optimizedPlan
- val hint = joinPlan.collect {
- case Join(_, _, _, _, hint) => hint
+ def checkHintExists(): Unit = {
+ // Test the broadcast hint.
+ val joinPlan = df.join(df2, "id").queryExecution.optimizedPlan
+ val joinHints = joinPlan.collect {
+ case Join(_, _, _, _, hint) => hint
+ }
+ assert(joinHints.size == 1)
+ assert(joinHints(0).leftHint.get.strategy.contains(expectedHint))
+ assert(joinHints(0).rightHint.isEmpty)
+ }
+
+ // Make sure the hint does exist when `df` is not cached.
+ checkHintExists()
+
+ df.cache()
+ try {
+ df.count()
+ // Make sure the hint still exists when `df` is cached.
+ checkHintExists()
+ } finally {
+ // Clean-up
+ df.unpersist()
+ }
}
- assert(hint.size == 1)
- assert(hint(0).leftHint.get.strategy.contains(BROADCAST))
- assert(hint(0).rightHint.isEmpty)
- // Clean-up
- df.unpersist()
+ // The hint is the root node
+ testHint(broadcast(spark.range(1000)), BROADCAST)
+ // The hint is under subquery alias
+ testHint(broadcast(spark.range(1000)).as("df"), BROADCAST)
+ // The hint is under filter
+ testHint(broadcast(spark.range(1000)).filter($"id" > 100), BROADCAST)
+ // If there are 2 adjacent hints, the top one takes effect.
+ testHint(
+ spark.range(1000)
+ .hint("SHUFFLE_MERGE")
+ .hint("SHUFFLE_HASH")
+ .as("df"),
+ SHUFFLE_HASH)
}
test("analyzes column statistics in cached query") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org