You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/07/16 05:37:59 UTC

[GitHub] [spark] viirya commented on a change in pull request #29107: [SPARK-32308][SQL] Move by-name resolution logic of unionByName from API code to analysis phase

viirya commented on a change in pull request #29107:
URL: https://github.com/apache/spark/pull/29107#discussion_r455521016



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
##########
@@ -1099,6 +1101,64 @@ object TypeCoercion {
         DateSub(l, Literal(days))
     }
   }
+
+  /**
+   * Coerces different children of Union to a common set of columns. Note that this must be
+   * run before `WidenSetOperationTypes`, because `WidenSetOperationTypes` should be run on
+   * correctly resolved column by name.
+   */
+  object UnionCoercion extends TypeCoercionRule {
+    private def unionTwoSides(
+        left: LogicalPlan, right: LogicalPlan, allowMissingCol: Boolean): LogicalPlan = {
+      val resolver = SQLConf.get.resolver
+      val leftOutputAttrs = left.output
+      val rightOutputAttrs = right.output
+
+      // Builds a project list for `right` based on `left` output names
+      val rightProjectList = leftOutputAttrs.map { lattr =>
+        rightOutputAttrs.find { rattr => resolver(lattr.name, rattr.name) }.getOrElse {
+          if (allowMissingCol) {
+            Alias(Literal(null, lattr.dataType), lattr.name)()
+          } else {
+            throw new AnalysisException(
+              s"""Cannot resolve column name "${lattr.name}" among """ +
+                s"""(${rightOutputAttrs.map(_.name).mkString(", ")})""")
+          }
+        }
+      }
+
+      // Delegates failure checks to `CheckAnalysis`
+      val notFoundAttrs = rightOutputAttrs.diff(rightProjectList)
+      val rightChild = Project(rightProjectList ++ notFoundAttrs, right)
+
+      // Builds a project for `logicalPlan` based on `right` output names, if allowing
+      // missing columns.
+      val leftChild = if (allowMissingCol) {
+        val missingAttrs = notFoundAttrs.map { attr =>
+          Alias(Literal(null, attr.dataType), attr.name)()
+        }
+        if (missingAttrs.nonEmpty) {
+          Project(leftOutputAttrs ++ missingAttrs, left)
+        } else {
+          left
+        }
+      } else {
+        left
+      }
+      Union(leftChild, rightChild)
+    }
+
+    override protected def coerceTypes(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
+      case e if !e.childrenResolved => e
+
+      case Union(children, byName, allowMissingCol)
+          if byName =>
+        val union = children.reduceLeft { (left: LogicalPlan, right: LogicalPlan) =>
+          unionTwoSides(left, right, allowMissingCol)

Review comment:
       If looks not proper after rethinking, we can also move to other rule or create another rule.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org