You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/08/17 12:20:14 UTC

[GitHub] [spark] peter-toth commented on a diff in pull request #37525: [SPARK-40086][SQL] Improve AliasAwareOutputPartitioning to take all aliases into account

peter-toth commented on code in PR #37525:
URL: https://github.com/apache/spark/pull/37525#discussion_r947860136


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala:
##########
@@ -16,24 +16,56 @@
  */
 package org.apache.spark.sql.execution
 
-import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, NamedExpression, SortOrder}
-import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, PartitioningCollection, UnknownPartitioning}
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, NamedExpression, SortOrder}
+import org.apache.spark.sql.catalyst.plans.logical.LeafNode
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection, UnknownPartitioning}
 
 /**
  * A trait that provides functionality to handle aliases in the `outputExpressions`.
  */
 trait AliasAwareOutputExpression extends UnaryExecNode {
   protected def outputExpressions: Seq[NamedExpression]
 
-  private lazy val aliasMap = outputExpressions.collect {
-    case a @ Alias(child, _) => child.canonicalized -> a.toAttribute
-  }.toMap
+  private lazy val aliasMap = {
+    val aliases = mutable.Map[Expression, mutable.ListBuffer[Attribute]]()
+    // Add aliases to the map. If multiple alias is defined for a source attribute then add all.
+    outputExpressions.foreach {
+      case a @ Alias(child, _) =>
+        // This prepend is needed to make the first element of the `ListBuffer` point to the last
+        // occurrence of an aliased child. This is to keep the previous behavior:
+        // - when we return `Partitioning`s in `outputPartitioning()`, the first should be same as
+        //   it was previously
+        // - when we return a `SortOrder` in `outputOrdering()`, it should be should be same as
+        //   previously
+        a.toAttribute +=: aliases.getOrElseUpdate(child.canonicalized, mutable.ListBuffer.empty)
+      case _ =>
+    }
+    // Append identity mapping of an attribute to the map if both the attribute and its aliased
+    // version can be found in `outputExpressions`.
+    outputExpressions.foreach {
+      case a: Attribute if aliases.contains(a.canonicalized) => aliases(a.canonicalized) += a
+      case _ =>
+    }
+    aliases
+  }
 
   protected def hasAlias: Boolean = aliasMap.nonEmpty
 
-  protected def normalizeExpression(exp: Expression): Expression = {
+  // This function returns a limited number (64) of alternatives to mapped `exp`.

Review Comment:
   sure, done in https://github.com/apache/spark/pull/37525/commits/7266347040cf6af77190c47d91854638aa8a046d, let me know if new conf name doesn't fit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org