You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/07/26 16:19:43 UTC

[GitHub] [spark] HyukjinKwon commented on a change in pull request #29166: [SPARK-32280][SPARK-32372][SQL] ResolveReferences.dedupRight should only rewrite attributes for ancestor nodes of the conflict plan

HyukjinKwon commented on a change in pull request #29166:
URL: https://github.com/apache/spark/pull/29166#discussion_r460545971



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -1237,20 +1250,50 @@ class Analyzer(
       if (conflictPlans.isEmpty) {
         right
       } else {
-        val attributeRewrites = AttributeMap(conflictPlans.flatMap {
-          case (oldRelation, newRelation) => oldRelation.output.zip(newRelation.output)})
-        val conflictPlanMap = conflictPlans.toMap
-        // transformDown so that we can replace all the old Relations in one turn due to
-        // the reason that `conflictPlans` are also collected in pre-order.
-        right transformDown {
-          case r => conflictPlanMap.getOrElse(r, r)
-        } transformUp {
-          case other => other transformExpressions {
+        rewritePlan(right, conflictPlans.toMap)._1
+      }
+    }
+
+    private def rewritePlan(plan: LogicalPlan, conflictPlanMap: Map[LogicalPlan, LogicalPlan])
+      : (LogicalPlan, Seq[(Attribute, Attribute)]) = {
+      if (conflictPlanMap.contains(plan)) {
+        // If the plan is the one that conflict the with left one, we'd
+        // just replace it with the new plan and collect the rewrite
+        // attributes for the parent node.
+        val newRelation = conflictPlanMap(plan)
+        newRelation -> plan.output.zip(newRelation.output)
+      } else {
+        val attrMapping = new mutable.ArrayBuffer[(Attribute, Attribute)]()
+        val newPlan = plan.mapChildren { child =>
+          // If not, we'd rewrite child plan recursively until we find the
+          // conflict node or reach the leaf node.
+          val (newChild, childAttrMapping) = rewritePlan(child, conflictPlanMap)
+          attrMapping ++= childAttrMapping.filter { case (oldAttr, _) =>
+            // `attrMapping` is not only used to replace the attributes of the current `plan`,
+            // but also to be propagated to the parent plans of the current `plan`. Therefore,
+            // the `oldAttr` must be part of either `plan.references` (so that it can be used to
+            // replace attributes of the current `plan`) or `plan.outputSet` (so that it can be
+            // used by those parent plans).
+            (plan.outputSet ++ plan.references).contains(oldAttr)
+          }
+          newChild
+        }
+
+        if (attrMapping.isEmpty) {
+          newPlan -> attrMapping

Review comment:
       Thanks for letting me know. +1




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org