You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/01/20 13:38:04 UTC
[spark] branch branch-3.0 updated: [SPARK-34178][SQL] Copy tags for
the new node created by MultiInstanceRelation.newInstance
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 89443ab [SPARK-34178][SQL] Copy tags for the new node created by MultiInstanceRelation.newInstance
89443ab is described below
commit 89443ab1118b0e07acd639609094961f783b01e1
Author: yi.wu <yi...@databricks.com>
AuthorDate: Wed Jan 20 13:36:14 2021 +0000
[SPARK-34178][SQL] Copy tags for the new node created by MultiInstanceRelation.newInstance
### What changes were proposed in this pull request?
Call `copyTagsFrom` for the new node created by `MultiInstanceRelation.newInstance()`.
### Why are the changes needed?
```scala
val df = spark.range(2)
df.join(df, df("id") <=> df("id")).show()
```
For this query, it's supposed to be non-ambiguous join by the rule `DetectAmbiguousSelfJoin` because of the same attribute reference in the condition:
https://github.com/apache/spark/blob/537a49fc0966b0b289b67ac9c6ea20093165b0da/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala#L125
However, `DetectAmbiguousSelfJoin` can not apply this prediction due to the right side plan doesn't contain the dataset_id TreeNodeTag, which is missing after `MultiInstanceRelation.newInstance`. That's why we should preserve the tags info for the copied node.
Fortunately, the query is still considered as non-ambiguous join because `DetectAmbiguousSelfJoin` only checks the left side plan and the reference is the same as the left side plan. However, this's not the expected behavior but only a coincidence.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Updated a unit test
Closes #31260 from Ngone51/fix-missing-tags.
Authored-by: yi.wu <yi...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit f4989772229e2ba35f1d005727b7d4d9f1369895)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 6 +++++-
.../scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala | 2 +-
.../test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala | 12 +++++++++++-
3 files changed, 17 insertions(+), 3 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index fbe6041..2dbabfc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1012,7 +1012,10 @@ class Analyzer(
}
val key = catalog.name +: ident.namespace :+ ident.name
AnalysisContext.get.relationCache.get(key).map(_.transform {
- case multi: MultiInstanceRelation => multi.newInstance()
+ case multi: MultiInstanceRelation =>
+ val newRelation = multi.newInstance()
+ newRelation.copyTagsFrom(multi)
+ newRelation
}).orElse {
loaded.foreach(AnalysisContext.get.relationCache.update(key, _))
loaded
@@ -1164,6 +1167,7 @@ class Analyzer(
case oldVersion: MultiInstanceRelation
if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty =>
val newVersion = oldVersion.newInstance()
+ newVersion.copyTagsFrom(oldVersion)
Seq((oldVersion, newVersion))
case oldVersion: SerializeFromObject
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index 4dc3627..68bbd1b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -91,7 +91,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
*/
private val tags: mutable.Map[TreeNodeTag[_], Any] = mutable.Map.empty
- protected def copyTagsFrom(other: BaseType): Unit = {
+ def copyTagsFrom(other: BaseType): Unit = {
// SPARK-32753: it only makes sense to copy tags to a new node
// but it's too expensive to detect other cases likes node removal
// so we make a compromise here to copy tags to node with no tags
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index a49f95f..0cf81b4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, Filter, HintInfo,
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin.LogicalPlanWithDatasetId
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec
import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
@@ -264,7 +265,16 @@ class DataFrameJoinSuite extends QueryTest
withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "false") {
val df = spark.range(2)
// this throws an exception before the fix
- df.join(df, df("id") <=> df("id")).queryExecution.optimizedPlan
+ val plan = df.join(df, df("id") <=> df("id")).queryExecution.optimizedPlan
+
+ plan match {
+ // SPARK-34178: we can't match the plan before the fix due to
+ // the right side plan doesn't contains dataset id.
+ case Join(
+ LogicalPlanWithDatasetId(_, leftId),
+ LogicalPlanWithDatasetId(_, rightId), _, _, _) =>
+ assert(leftId === rightId)
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org