You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ya...@apache.org on 2020/09/08 23:27:03 UTC

[spark] branch branch-3.0 updated: [SPARK-32638][SQL][3.0] Corrects references when adding aliases in WidenSetOperationTypes

This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 3f20f14  [SPARK-32638][SQL][3.0] Corrects references when adding aliases in WidenSetOperationTypes
3f20f14 is described below

commit 3f20f1413ccb1f685b96de96e5e705db6a70e058
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Wed Sep 9 08:21:59 2020 +0900

    [SPARK-32638][SQL][3.0] Corrects references when adding aliases in WidenSetOperationTypes
    
    ### What changes were proposed in this pull request?
    
    This PR intends to fix a bug where references can be missing when adding aliases to widen data types in `WidenSetOperationTypes`. For example,
    ```
    CREATE OR REPLACE TEMPORARY VIEW t3 AS VALUES (decimal(1)) tbl(v);
    SELECT t.v FROM (
      SELECT v FROM t3
      UNION ALL
      SELECT v + v AS v FROM t3
    ) t;
    
    org.apache.spark.sql.AnalysisException: Resolved attribute(s) v#1 missing from v#3 in operator !Project [v#1]. Attribute(s) with the same name appear in the operation: v. Please check if the right attribute(s) are used.;;
    !Project [v#1]  <------ the reference got missing
    +- SubqueryAlias t
       +- Union
          :- Project [cast(v#1 as decimal(11,0)) AS v#3]
          :  +- Project [v#1]
          :     +- SubqueryAlias t3
          :        +- SubqueryAlias tbl
          :           +- LocalRelation [v#1]
          +- Project [v#2]
             +- Project [CheckOverflow((promote_precision(cast(v#1 as decimal(11,0))) + promote_precision(cast(v#1 as decimal(11,0)))), DecimalType(11,0), true) AS v#2]
                +- SubqueryAlias t3
                   +- SubqueryAlias tbl
                      +- LocalRelation [v#1]
    ```
    In the case, `WidenSetOperationTypes` added the alias `cast(v#1 as decimal(11,0)) AS v#3`, then the reference in the top `Project` got missing. This PR correct the reference (`exprId` and widen `dataType`) after adding aliases in the rule.
    
    This backport for 3.0 comes from #29485 and #29643
    
    ### Why are the changes needed?
    
    bugfixes
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Added unit tests
    
    Closes #29680 from maropu/SPARK-32638-BRANCH3.0.
    
    Lead-authored-by: Wenchen Fan <we...@databricks.com>
    Co-authored-by: Takeshi Yamamuro <ya...@apache.org>
    Signed-off-by: Takeshi Yamamuro <ya...@apache.org>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     | 109 ++-------------------
 .../spark/sql/catalyst/analysis/TypeCoercion.scala |  59 +++++++----
 .../spark/sql/catalyst/plans/QueryPlan.scala       |  85 ++++++++++++++++
 .../catalyst/plans/logical/AnalysisHelper.scala    |  15 ++-
 .../sql/catalyst/analysis/TypeCoercionSuite.scala  |  18 +++-
 .../src/test/resources/sql-tests/inputs/except.sql |  19 ++++
 .../resources/sql-tests/inputs/intersect-all.sql   |  15 +++
 .../src/test/resources/sql-tests/inputs/union.sql  |  14 +++
 .../resources/sql-tests/results/except.sql.out     |  58 ++++++++++-
 .../sql-tests/results/intersect-all.sql.out        |  42 +++++++-
 .../test/resources/sql-tests/results/union.sql.out |  43 +++++++-
 11 files changed, 348 insertions(+), 129 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 89454c2..729d316 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1239,108 +1239,13 @@ class Analyzer(
       if (conflictPlans.isEmpty) {
         right
       } else {
-        rewritePlan(right, conflictPlans.toMap)._1
-      }
-    }
-
-    private def rewritePlan(plan: LogicalPlan, conflictPlanMap: Map[LogicalPlan, LogicalPlan])
-      : (LogicalPlan, Seq[(Attribute, Attribute)]) = {
-      if (conflictPlanMap.contains(plan)) {
-        // If the plan is the one that conflict the with left one, we'd
-        // just replace it with the new plan and collect the rewrite
-        // attributes for the parent node.
-        val newRelation = conflictPlanMap(plan)
-        newRelation -> plan.output.zip(newRelation.output)
-      } else {
-        val attrMapping = new mutable.ArrayBuffer[(Attribute, Attribute)]()
-        val newPlan = plan.mapChildren { child =>
-          // If not, we'd rewrite child plan recursively until we find the
-          // conflict node or reach the leaf node.
-          val (newChild, childAttrMapping) = rewritePlan(child, conflictPlanMap)
-          attrMapping ++= childAttrMapping.filter { case (oldAttr, _) =>
-            // `attrMapping` is not only used to replace the attributes of the current `plan`,
-            // but also to be propagated to the parent plans of the current `plan`. Therefore,
-            // the `oldAttr` must be part of either `plan.references` (so that it can be used to
-            // replace attributes of the current `plan`) or `plan.outputSet` (so that it can be
-            // used by those parent plans).
-            (plan.outputSet ++ plan.references).contains(oldAttr)
-          }
-          newChild
-        }
-
-        if (attrMapping.isEmpty) {
-          newPlan -> attrMapping
-        } else {
-          assert(!attrMapping.groupBy(_._1.exprId)
-            .exists(_._2.map(_._2.exprId).distinct.length > 1),
-            "Found duplicate rewrite attributes")
-          val attributeRewrites = AttributeMap(attrMapping)
-          // Using attrMapping from the children plans to rewrite their parent node.
-          // Note that we shouldn't rewrite a node using attrMapping from its sibling nodes.
-          newPlan.transformExpressions {
-            case a: Attribute =>
-              dedupAttr(a, attributeRewrites)
-            case s: SubqueryExpression =>
-              s.withNewPlan(dedupOuterReferencesInSubquery(s.plan, attributeRewrites))
-          } -> attrMapping
-        }
-      }
-    }
-
-    private def dedupAttr(attr: Attribute, attrMap: AttributeMap[Attribute]): Attribute = {
-      val exprId = attrMap.getOrElse(attr, attr).exprId
-      attr.withExprId(exprId)
-    }
-
-    /**
-     * The outer plan may have been de-duplicated and the function below updates the
-     * outer references to refer to the de-duplicated attributes.
-     *
-     * For example (SQL):
-     * {{{
-     *   SELECT * FROM t1
-     *   INTERSECT
-     *   SELECT * FROM t1
-     *   WHERE EXISTS (SELECT 1
-     *                 FROM t2
-     *                 WHERE t1.c1 = t2.c1)
-     * }}}
-     * Plan before resolveReference rule.
-     *    'Intersect
-     *    :- Project [c1#245, c2#246]
-     *    :  +- SubqueryAlias t1
-     *    :     +- Relation[c1#245,c2#246] parquet
-     *    +- 'Project [*]
-     *       +- Filter exists#257 [c1#245]
-     *       :  +- Project [1 AS 1#258]
-     *       :     +- Filter (outer(c1#245) = c1#251)
-     *       :        +- SubqueryAlias t2
-     *       :           +- Relation[c1#251,c2#252] parquet
-     *       +- SubqueryAlias t1
-     *          +- Relation[c1#245,c2#246] parquet
-     * Plan after the resolveReference rule.
-     *    Intersect
-     *    :- Project [c1#245, c2#246]
-     *    :  +- SubqueryAlias t1
-     *    :     +- Relation[c1#245,c2#246] parquet
-     *    +- Project [c1#259, c2#260]
-     *       +- Filter exists#257 [c1#259]
-     *       :  +- Project [1 AS 1#258]
-     *       :     +- Filter (outer(c1#259) = c1#251) => Updated
-     *       :        +- SubqueryAlias t2
-     *       :           +- Relation[c1#251,c2#252] parquet
-     *       +- SubqueryAlias t1
-     *          +- Relation[c1#259,c2#260] parquet  => Outer plan's attributes are de-duplicated.
-     */
-    private def dedupOuterReferencesInSubquery(
-        plan: LogicalPlan,
-        attrMap: AttributeMap[Attribute]): LogicalPlan = {
-      plan transformDown { case currentFragment =>
-        currentFragment transformExpressions {
-          case OuterReference(a: Attribute) =>
-            OuterReference(dedupAttr(a, attrMap))
-          case s: SubqueryExpression =>
-            s.withNewPlan(dedupOuterReferencesInSubquery(s.plan, attrMap))
+        val planMapping = conflictPlans.toMap
+        right.transformUpWithNewOutput {
+          case oldPlan =>
+            val newPlanOpt = planMapping.get(oldPlan)
+            newPlanOpt.map { newPlan =>
+              newPlan -> oldPlan.output.zip(newPlan.output)
+            }.getOrElse(oldPlan -> Nil)
         }
       }
     }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
index a6f8e12..df8835d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
@@ -326,25 +326,42 @@ object TypeCoercion {
    *
    * This rule is only applied to Union/Except/Intersect
    */
-  object WidenSetOperationTypes extends Rule[LogicalPlan] {
-
-    def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
-      case s @ Except(left, right, isAll) if s.childrenResolved &&
-        left.output.length == right.output.length && !s.resolved =>
-        val newChildren: Seq[LogicalPlan] = buildNewChildrenWithWiderTypes(left :: right :: Nil)
-        assert(newChildren.length == 2)
-        Except(newChildren.head, newChildren.last, isAll)
-
-      case s @ Intersect(left, right, isAll) if s.childrenResolved &&
-        left.output.length == right.output.length && !s.resolved =>
-        val newChildren: Seq[LogicalPlan] = buildNewChildrenWithWiderTypes(left :: right :: Nil)
-        assert(newChildren.length == 2)
-        Intersect(newChildren.head, newChildren.last, isAll)
-
-      case s: Union if s.childrenResolved &&
+  object WidenSetOperationTypes extends TypeCoercionRule {
+
+    override protected def coerceTypes(plan: LogicalPlan): LogicalPlan = {
+      plan resolveOperatorsUpWithNewOutput {
+        case s @ Except(left, right, isAll) if s.childrenResolved &&
+          left.output.length == right.output.length && !s.resolved =>
+          val newChildren: Seq[LogicalPlan] = buildNewChildrenWithWiderTypes(left :: right :: Nil)
+          if (newChildren.isEmpty) {
+            s -> Nil
+          } else {
+            assert(newChildren.length == 2)
+            val attrMapping = left.output.zip(newChildren.head.output)
+            Except(newChildren.head, newChildren.last, isAll) -> attrMapping
+          }
+
+        case s @ Intersect(left, right, isAll) if s.childrenResolved &&
+          left.output.length == right.output.length && !s.resolved =>
+          val newChildren: Seq[LogicalPlan] = buildNewChildrenWithWiderTypes(left :: right :: Nil)
+          if (newChildren.isEmpty) {
+            s -> Nil
+          } else {
+            assert(newChildren.length == 2)
+            val attrMapping = left.output.zip(newChildren.head.output)
+            Intersect(newChildren.head, newChildren.last, isAll) -> attrMapping
+          }
+
+        case s: Union if s.childrenResolved &&
           s.children.forall(_.output.length == s.children.head.output.length) && !s.resolved =>
-        val newChildren: Seq[LogicalPlan] = buildNewChildrenWithWiderTypes(s.children)
-        s.makeCopy(Array(newChildren))
+          val newChildren: Seq[LogicalPlan] = buildNewChildrenWithWiderTypes(s.children)
+          if (newChildren.isEmpty) {
+            s -> Nil
+          } else {
+            val attrMapping = s.children.head.output.zip(newChildren.head.output)
+            s.copy(children = newChildren) -> attrMapping
+          }
+      }
     }
 
     /** Build new children with the widest types for each attribute among all the children */
@@ -360,8 +377,7 @@ object TypeCoercion {
         // Add an extra Project if the targetTypes are different from the original types.
         children.map(widenTypes(_, targetTypes))
       } else {
-        // Unable to find a target type to widen, then just return the original set.
-        children
+        Nil
       }
     }
 
@@ -387,7 +403,8 @@ object TypeCoercion {
     /** Given a plan, add an extra project on top to widen some columns' data types. */
     private def widenTypes(plan: LogicalPlan, targetTypes: Seq[DataType]): LogicalPlan = {
       val casted = plan.output.zip(targetTypes).map {
-        case (e, dt) if e.dataType != dt => Alias(Cast(e, dt), e.name)()
+        case (e, dt) if e.dataType != dt =>
+          Alias(Cast(e, dt, Some(SQLConf.get.sessionLocalTimeZone)), e.name)()
         case (e, _) => e
       }
       Project(casted, plan)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index 7133fb2..fed5df6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.catalyst.plans
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, TreeNode, TreeNodeTag}
@@ -168,6 +170,89 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
     }.toSeq
   }
 
+  /**
+   * A variant of `transformUp`, which takes care of the case that the rule replaces a plan node
+   * with a new one that has different output expr IDs, by updating the attribute references in
+   * the parent nodes accordingly.
+   *
+   * @param rule the function to transform plan nodes, and return new nodes with attributes mapping
+   *             from old attributes to new attributes. The attribute mapping will be used to
+   *             rewrite attribute references in the parent nodes.
+   * @param skipCond a boolean condition to indicate if we can skip transforming a plan node to save
+   *                 time.
+   */
+  def transformUpWithNewOutput(
+      rule: PartialFunction[PlanType, (PlanType, Seq[(Attribute, Attribute)])],
+      skipCond: PlanType => Boolean = _ => false): PlanType = {
+    def rewrite(plan: PlanType): (PlanType, Seq[(Attribute, Attribute)]) = {
+      if (skipCond(plan)) {
+        plan -> Nil
+      } else {
+        val attrMapping = new mutable.ArrayBuffer[(Attribute, Attribute)]()
+        var newPlan = plan.mapChildren { child =>
+          val (newChild, childAttrMapping) = rewrite(child)
+          attrMapping ++= childAttrMapping
+          newChild
+        }
+
+        val attrMappingForCurrentPlan = attrMapping.filter {
+          // The `attrMappingForCurrentPlan` is used to replace the attributes of the
+          // current `plan`, so the `oldAttr` must be part of `plan.references`.
+          case (oldAttr, _) => plan.references.contains(oldAttr)
+        }
+
+        val (planAfterRule, newAttrMapping) = CurrentOrigin.withOrigin(origin) {
+          rule.applyOrElse(newPlan, (plan: PlanType) => plan -> Nil)
+        }
+        newPlan = planAfterRule
+
+        if (attrMappingForCurrentPlan.nonEmpty) {
+          assert(!attrMappingForCurrentPlan.groupBy(_._1.exprId)
+            .exists(_._2.map(_._2.exprId).distinct.length > 1),
+            "Found duplicate rewrite attributes")
+
+          val attributeRewrites = AttributeMap(attrMappingForCurrentPlan)
+          // Using attrMapping from the children plans to rewrite their parent node.
+          // Note that we shouldn't rewrite a node using attrMapping from its sibling nodes.
+          newPlan = newPlan.transformExpressions {
+            case a: AttributeReference =>
+              updateAttr(a, attributeRewrites)
+            case pe: PlanExpression[PlanType] =>
+              pe.withNewPlan(updateOuterReferencesInSubquery(pe.plan, attributeRewrites))
+          }
+        }
+
+        attrMapping ++= newAttrMapping.filter {
+          case (a1, a2) => a1.exprId != a2.exprId
+        }
+        newPlan -> attrMapping
+      }
+    }
+    rewrite(this)._1
+  }
+
+  private def updateAttr(attr: Attribute, attrMap: AttributeMap[Attribute]): Attribute = {
+    val exprId = attrMap.getOrElse(attr, attr).exprId
+    attr.withExprId(exprId)
+  }
+
+  /**
+   * The outer plan may have old references and the function below updates the
+   * outer references to refer to the new attributes.
+   */
+  private def updateOuterReferencesInSubquery(
+      plan: PlanType,
+      attrMap: AttributeMap[Attribute]): PlanType = {
+    plan.transformDown { case currentFragment =>
+      currentFragment.transformExpressions {
+        case OuterReference(a: AttributeReference) =>
+          OuterReference(updateAttr(a, attrMap))
+        case pe: PlanExpression[PlanType] =>
+          pe.withNewPlan(updateOuterReferencesInSubquery(pe.plan, attrMap))
+      }
+    }
+  }
+
   lazy val schema: StructType = StructType.fromAttributes(output)
 
   /** Returns the output schema in the tree format. */
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala
index 9404a80..30447db 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.catalyst.plans.logical
 
 import org.apache.spark.sql.catalyst.analysis.CheckAnalysis
-import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, TreeNode}
 import org.apache.spark.util.Utils
@@ -121,6 +121,19 @@ trait AnalysisHelper extends QueryPlan[LogicalPlan] { self: LogicalPlan =>
   }
 
   /**
+   * A variant of `transformUpWithNewOutput`, which skips touching already analyzed plan.
+   */
+  def resolveOperatorsUpWithNewOutput(
+      rule: PartialFunction[LogicalPlan, (LogicalPlan, Seq[(Attribute, Attribute)])])
+  : LogicalPlan = {
+    if (!analyzed) {
+      transformUpWithNewOutput(rule, skipCond = _.analyzed)
+    } else {
+      self
+    }
+  }
+
+  /**
    * Recursively transforms the expressions of a tree, skipping nodes that have already
    * been analyzed.
    */
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
index 1ea1ddb..7b80de9 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
@@ -21,13 +21,12 @@ import java.sql.Timestamp
 
 import org.apache.spark.sql.catalyst.analysis.TypeCoercion._
 import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.CalendarInterval
 
 class TypeCoercionSuite extends AnalysisTest {
   import TypeCoercionSuite._
@@ -1417,6 +1416,21 @@ class TypeCoercionSuite extends AnalysisTest {
     }
   }
 
+  test("SPARK-32638: corrects references when adding aliases in WidenSetOperationTypes") {
+    val t1 = LocalRelation(AttributeReference("v", DecimalType(10, 0))())
+    val t2 = LocalRelation(AttributeReference("v", DecimalType(11, 0))())
+    val p1 = t1.select(t1.output.head).as("p1")
+    val p2 = t2.select(t2.output.head).as("p2")
+    val union = p1.union(p2)
+    val wp1 = widenSetOperationTypes(union.select(p1.output.head, $"p2.v"))
+    assert(wp1.isInstanceOf[Project])
+    // The attribute `p1.output.head` should be replaced in the root `Project`.
+    assert(wp1.expressions.forall(_.find(_ == p1.output.head).isEmpty))
+    val wp2 = widenSetOperationTypes(Aggregate(Nil, sum(p1.output.head).as("v") :: Nil, union))
+    assert(wp2.isInstanceOf[Aggregate])
+    assert(wp2.missingInput.isEmpty)
+  }
+
   /**
    * There are rules that need to not fire before child expressions get resolved.
    * We use this test to make sure those rules do not fire early.
diff --git a/sql/core/src/test/resources/sql-tests/inputs/except.sql b/sql/core/src/test/resources/sql-tests/inputs/except.sql
index 1d579e6..ffdf1f4 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/except.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/except.sql
@@ -55,3 +55,22 @@ FROM   t1
 WHERE  t1.v >= (SELECT   min(t2.v)
                 FROM     t2
                 WHERE    t2.k = t1.k);
+
+-- SPARK-32638: corrects references when adding aliases in WidenSetOperationTypes
+CREATE OR REPLACE TEMPORARY VIEW t3 AS VALUES (decimal(1)) tbl(v);
+SELECT t.v FROM (
+  SELECT v FROM t3
+  EXCEPT
+  SELECT v + v AS v FROM t3
+) t;
+
+SELECT SUM(t.v) FROM (
+  SELECT v FROM t3
+  EXCEPT
+  SELECT v + v AS v FROM t3
+) t;
+
+-- Clean-up
+DROP VIEW IF EXISTS t1;
+DROP VIEW IF EXISTS t2;
+DROP VIEW IF EXISTS t3;
diff --git a/sql/core/src/test/resources/sql-tests/inputs/intersect-all.sql b/sql/core/src/test/resources/sql-tests/inputs/intersect-all.sql
index b0b2244..077caa5 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/intersect-all.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/intersect-all.sql
@@ -155,6 +155,21 @@ SELECT * FROM tab2;
 -- Restore the property
 SET spark.sql.legacy.setopsPrecedence.enabled = false;
 
+-- SPARK-32638: corrects references when adding aliases in WidenSetOperationTypes
+CREATE OR REPLACE TEMPORARY VIEW tab3 AS VALUES (decimal(1)), (decimal(2)) tbl3(v);
+SELECT t.v FROM (
+  SELECT v FROM tab3
+  INTERSECT
+  SELECT v + v AS v FROM tab3
+) t;
+
+SELECT SUM(t.v) FROM (
+  SELECT v FROM tab3
+  INTERSECT
+  SELECT v + v AS v FROM tab3
+) t;
+
 -- Clean-up 
 DROP VIEW IF EXISTS tab1;
 DROP VIEW IF EXISTS tab2;
+DROP VIEW IF EXISTS tab3;
diff --git a/sql/core/src/test/resources/sql-tests/inputs/union.sql b/sql/core/src/test/resources/sql-tests/inputs/union.sql
index 6da1b9b..8a5b6c5 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/union.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/union.sql
@@ -45,10 +45,24 @@ SELECT array(1, 2), 'str'
 UNION ALL
 SELECT array(1, 2, 3, NULL), 1;
 
+-- SPARK-32638: corrects references when adding aliases in WidenSetOperationTypes
+CREATE OR REPLACE TEMPORARY VIEW t3 AS VALUES (decimal(1)) tbl(v);
+SELECT t.v FROM (
+  SELECT v FROM t3
+  UNION ALL
+  SELECT v + v AS v FROM t3
+) t;
+
+SELECT SUM(t.v) FROM (
+  SELECT v FROM t3
+  UNION
+  SELECT v + v AS v FROM t3
+) t;
 
 -- Clean-up
 DROP VIEW IF EXISTS t1;
 DROP VIEW IF EXISTS t2;
+DROP VIEW IF EXISTS t3;
 DROP VIEW IF EXISTS p1;
 DROP VIEW IF EXISTS p2;
 DROP VIEW IF EXISTS p3;
diff --git a/sql/core/src/test/resources/sql-tests/results/except.sql.out b/sql/core/src/test/resources/sql-tests/results/except.sql.out
index 62d6952..061b122 100644
--- a/sql/core/src/test/resources/sql-tests/results/except.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/except.sql.out
@@ -1,5 +1,5 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 9
+-- Number of queries: 15
 
 
 -- !query
@@ -103,3 +103,59 @@ WHERE  t1.v >= (SELECT   min(t2.v)
 struct<k:string>
 -- !query output
 two
+
+
+-- !query
+CREATE OR REPLACE TEMPORARY VIEW t3 AS VALUES (decimal(1)) tbl(v)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT t.v FROM (
+  SELECT v FROM t3
+  EXCEPT
+  SELECT v + v AS v FROM t3
+) t
+-- !query schema
+struct<v:decimal(11,0)>
+-- !query output
+1
+
+
+-- !query
+SELECT SUM(t.v) FROM (
+  SELECT v FROM t3
+  EXCEPT
+  SELECT v + v AS v FROM t3
+) t
+-- !query schema
+struct<sum(v):decimal(21,0)>
+-- !query output
+1
+
+
+-- !query
+DROP VIEW IF EXISTS t1
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+DROP VIEW IF EXISTS t2
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+DROP VIEW IF EXISTS t3
+-- !query schema
+struct<>
+-- !query output
+
diff --git a/sql/core/src/test/resources/sql-tests/results/intersect-all.sql.out b/sql/core/src/test/resources/sql-tests/results/intersect-all.sql.out
index 4762082..b99f633 100644
--- a/sql/core/src/test/resources/sql-tests/results/intersect-all.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/intersect-all.sql.out
@@ -1,5 +1,5 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 22
+-- Number of queries: 26
 
 
 -- !query
@@ -292,6 +292,38 @@ spark.sql.legacy.setopsPrecedence.enabled	false
 
 
 -- !query
+CREATE OR REPLACE TEMPORARY VIEW tab3 AS VALUES (decimal(1)), (decimal(2)) tbl3(v)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT t.v FROM (
+  SELECT v FROM tab3
+  INTERSECT
+  SELECT v + v AS v FROM tab3
+) t
+-- !query schema
+struct<v:decimal(11,0)>
+-- !query output
+2
+
+
+-- !query
+SELECT SUM(t.v) FROM (
+  SELECT v FROM tab3
+  INTERSECT
+  SELECT v + v AS v FROM tab3
+) t
+-- !query schema
+struct<sum(v):decimal(21,0)>
+-- !query output
+2
+
+
+-- !query
 DROP VIEW IF EXISTS tab1
 -- !query schema
 struct<>
@@ -305,3 +337,11 @@ DROP VIEW IF EXISTS tab2
 struct<>
 -- !query output
 
+
+
+-- !query
+DROP VIEW IF EXISTS tab3
+-- !query schema
+struct<>
+-- !query output
+
diff --git a/sql/core/src/test/resources/sql-tests/results/union.sql.out b/sql/core/src/test/resources/sql-tests/results/union.sql.out
index 4400240..ce3c761 100644
--- a/sql/core/src/test/resources/sql-tests/results/union.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/union.sql.out
@@ -1,5 +1,5 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 16
+-- Number of queries: 20
 
 
 -- !query
@@ -127,6 +127,39 @@ struct<array(1, 2):array<int>,str:string>
 
 
 -- !query
+CREATE OR REPLACE TEMPORARY VIEW t3 AS VALUES (decimal(1)) tbl(v)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT t.v FROM (
+  SELECT v FROM t3
+  UNION ALL
+  SELECT v + v AS v FROM t3
+) t
+-- !query schema
+struct<v:decimal(11,0)>
+-- !query output
+1
+2
+
+
+-- !query
+SELECT SUM(t.v) FROM (
+  SELECT v FROM t3
+  UNION
+  SELECT v + v AS v FROM t3
+) t
+-- !query schema
+struct<sum(v):decimal(21,0)>
+-- !query output
+3
+
+
+-- !query
 DROP VIEW IF EXISTS t1
 -- !query schema
 struct<>
@@ -143,6 +176,14 @@ struct<>
 
 
 -- !query
+DROP VIEW IF EXISTS t3
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
 DROP VIEW IF EXISTS p1
 -- !query schema
 struct<>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org