You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/12/19 14:33:43 UTC

[GitHub] [spark] cloud-fan commented on a diff in pull request #38941: [SPARK-41498] Propagate metadata through Union

cloud-fan commented on code in PR #38941:
URL: https://github.com/apache/spark/pull/38941#discussion_r1052269541


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala:
##########
@@ -449,22 +449,54 @@ case class Union(
       AttributeSet.fromAttributeSets(children.map(_.outputSet)).size
   }
 
-  // updating nullability to make all the children consistent
-  override def output: Seq[Attribute] = {
-    children.map(_.output).transpose.map { attrs =>
-      val firstAttr = attrs.head
-      val nullable = attrs.exists(_.nullable)
-      val newDt = attrs.map(_.dataType).reduce(StructType.unionLikeMerge)
-      if (firstAttr.dataType == newDt) {
-        firstAttr.withNullability(nullable)
-      } else {
-        AttributeReference(firstAttr.name, newDt, nullable, firstAttr.metadata)(
-          firstAttr.exprId, firstAttr.qualifier)
-      }
+  /**
+   * Merges a sequence of attributes to have a common datatype and updates the
+   * nullability to be consistent with the attributes being merged.
+   */
+  private def mergeAttributes(attributes: Seq[Attribute]): Attribute = {
+    val firstAttr = attributes.head
+    val nullable = attributes.exists(_.nullable)
+    val newDt = attributes.map(_.dataType).reduce(StructType.unionLikeMerge)
+    if (firstAttr.dataType == newDt) {
+      firstAttr.withNullability(nullable)
+    } else {
+      AttributeReference(firstAttr.name, newDt, nullable, firstAttr.metadata)(
+        firstAttr.exprId, firstAttr.qualifier)
     }
   }
 
-  override def metadataOutput: Seq[Attribute] = Nil
+  override def output: Seq[Attribute] = children.map(_.output).transpose.map(mergeAttributes)
+
+  override def metadataOutput: Seq[Attribute] = {
+    val childrenMetadataOutput = children.map(_.metadataOutput)
+    // This follows similar code in `CheckAnalysis` to check if the output of a Union is correct,
+    // but just silently doesn't return an output instead of throwing an error. It also ensures
+    // that the attribute and data type names are the same.
+    val refDataTypes = childrenMetadataOutput.head.map(_.dataType)
+    val refAttrNames = childrenMetadataOutput.head.map(_.name)
+    childrenMetadataOutput.tail.foreach { childMetadataOutput =>
+      // We can only propagate the metadata output correctly if every child has the same
+      // number of columns
+      if (childMetadataOutput.length != refDataTypes.length) return Nil
+      // Check if the data types match by name and type
+      val childDataTypes = childMetadataOutput.map(_.dataType)
+      childDataTypes.zip(refDataTypes).foreach { case (dt1, dt2) =>
+        if (!DataType.equalsStructurally(dt1, dt2, true) ||
+           !DataType.equalsStructurallyByName(dt1, dt2, conf.resolver)) {

Review Comment:
   do we need to check `!DataType.equalsStructurally(dt1, dt2, true)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org