You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/11/10 00:55:50 UTC

[GitHub] [spark] maropu commented on a change in pull request #30300: [SPARK-33399][SQL] Normalize output partitioning of Project with respect to aliases

maropu commented on a change in pull request #30300:
URL: https://github.com/apache/spark/pull/30300#discussion_r520213847



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala
##########
@@ -48,13 +42,13 @@ trait AliasAwareOutputExpression extends UnaryExecNode {
  */
 trait AliasAwareOutputPartitioning extends AliasAwareOutputExpression {
   final override def outputPartitioning: Partitioning = {
-    if (hasAlias) {
-      child.outputPartitioning match {
-        case h: HashPartitioning => h.copy(expressions = replaceAliases(h.expressions))

Review comment:
       If we generalize this logic, I think its better to add tests for all the partitioning cases where possible.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala
##########
@@ -48,13 +42,13 @@ trait AliasAwareOutputExpression extends UnaryExecNode {
  */
 trait AliasAwareOutputPartitioning extends AliasAwareOutputExpression {
   final override def outputPartitioning: Partitioning = {
-    if (hasAlias) {
-      child.outputPartitioning match {
-        case h: HashPartitioning => h.copy(expressions = replaceAliases(h.expressions))
-        case other => other
-      }
-    } else {
-      child.outputPartitioning
+    child.outputPartitioning match {
+      case e: Expression if hasAlias =>
+        val normalizedExp = e.transformDown {

Review comment:
       nit: `normalizedExp` -> `normalizedExpr` and `transformDown` -> `transform`

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
##########
@@ -895,6 +895,73 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
     }
   }
 
+  test("No extra exchanges in case of [Inner Join -> Project with aliases -> Inner join]") {
+    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+      withSQLConf(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key -> "false") {

Review comment:
       Why we need to turn off the constant propagation?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala
##########
@@ -25,20 +25,14 @@ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partition
 trait AliasAwareOutputExpression extends UnaryExecNode {
   protected def outputExpressions: Seq[NamedExpression]
 
-  protected def hasAlias: Boolean = outputExpressions.collectFirst { case _: Alias => }.isDefined
+  lazy val aliasMap = AttributeMap(outputExpressions.collect {
+    case a @ Alias(child: AttributeReference, _) => (child, a.toAttribute)
+  })
 
-  protected def replaceAliases(exprs: Seq[Expression]): Seq[Expression] = {
-    exprs.map {
-      case a: AttributeReference => replaceAlias(a).getOrElse(a)
-      case other => other
-    }
-  }
+  protected def hasAlias: Boolean = aliasMap.nonEmpty
 
   protected def replaceAlias(attr: AttributeReference): Option[Attribute] = {

Review comment:
       Looks `AliasHelper` has the similar func and could you check if we can reuse it?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala
##########
@@ -25,20 +25,14 @@ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partition
 trait AliasAwareOutputExpression extends UnaryExecNode {
   protected def outputExpressions: Seq[NamedExpression]
 
-  protected def hasAlias: Boolean = outputExpressions.collectFirst { case _: Alias => }.isDefined
+  lazy val aliasMap = AttributeMap(outputExpressions.collect {

Review comment:
       `private`?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala
##########
@@ -25,20 +25,14 @@ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partition
 trait AliasAwareOutputExpression extends UnaryExecNode {
   protected def outputExpressions: Seq[NamedExpression]
 
-  protected def hasAlias: Boolean = outputExpressions.collectFirst { case _: Alias => }.isDefined
+  lazy val aliasMap = AttributeMap(outputExpressions.collect {

Review comment:
       Also, could you use `AliasHelper`?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala
##########
@@ -48,13 +42,13 @@ trait AliasAwareOutputExpression extends UnaryExecNode {
  */
 trait AliasAwareOutputPartitioning extends AliasAwareOutputExpression {
   final override def outputPartitioning: Partitioning = {
-    if (hasAlias) {
-      child.outputPartitioning match {
-        case h: HashPartitioning => h.copy(expressions = replaceAliases(h.expressions))
-        case other => other
-      }
-    } else {
-      child.outputPartitioning
+    child.outputPartitioning match {
+      case e: Expression if hasAlias =>
+        val normalizedExp = e.transformDown {

Review comment:
       btw, we need `transform` here? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org