You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/03/09 03:13:45 UTC

[GitHub] [spark] wangyum opened a new pull request #35779: [SPARK-36194][SQL] Add a logical plan visitor to propagate the distinct attributes

wangyum opened a new pull request #35779:
URL: https://github.com/apache/spark/pull/35779


   ### What changes were proposed in this pull request?
   
   1. This pr add a new logical plan visitor named `DistinctKeyVisitor` to find out all the distinct attributes in current logical plan. For example:
      ```scala
      spark.sql("CREATE TABLE t(a int, b int, c int) using parquet")
      spark.sql("SELECT a, b, a % 10, a AS aliased_a, max(c), sum(b) FROM t GROUP BY a, b").queryExecution.analyzed.distinctKeys
      ```
      The output is: {a#1, b#2}, {b#2, aliased_a#0}.
   
   2. Enhance `RemoveRedundantAggregates` to remove the aggregation if it is groupOnly and the child can guarantee distinct. For example:
      ```sql
      set spark.sql.autoBroadcastJoinThreshold=-1; -- avoid PushDownLeftSemiAntiJoin
      create table t1 using parquet as select id a, id as b from range(10);
      create table t2 using parquet as select id as a, id as b from range(8);
      select t11.a, t11.b from (select distinct a, b from t1) t11 left semi join t2 on (t11.a = t2.a) group by t11.a, t11.b;
      ```
   
      Before this PR:
      ```
      == Optimized Logical Plan ==
      Aggregate [a#6L, b#7L], [a#6L, b#7L], Statistics(sizeInBytes=1492.0 B)
      +- Join LeftSemi, (a#6L = a#8L), Statistics(sizeInBytes=1492.0 B)
         :- Aggregate [a#6L, b#7L], [a#6L, b#7L], Statistics(sizeInBytes=1492.0 B)
         :  +- Filter isnotnull(a#6L), Statistics(sizeInBytes=1492.0 B)
         :     +- Relation default.t1[a#6L,b#7L] parquet, Statistics(sizeInBytes=1492.0 B)
         +- Project [a#8L], Statistics(sizeInBytes=984.0 B)
            +- Filter isnotnull(a#8L), Statistics(sizeInBytes=1476.0 B)
               +- Relation default.t2[a#8L,b#9L] parquet, Statistics(sizeInBytes=1476.0 B)
      ```
   
      After this PR:
      ```
      == Optimized Logical Plan ==
      Join LeftSemi, (a#6L = a#8L), Statistics(sizeInBytes=1492.0 B)
      :- Aggregate [a#6L, b#7L], [a#6L, b#7L], Statistics(sizeInBytes=1492.0 B)
      :  +- Filter isnotnull(a#6L), Statistics(sizeInBytes=1492.0 B)
      :     +- Relation default.t1[a#6L,b#7L] parquet, Statistics(sizeInBytes=1492.0 B)
      +- Project [a#8L], Statistics(sizeInBytes=984.0 B)
         +- Filter isnotnull(a#8L), Statistics(sizeInBytes=1476.0 B)
            +- Relation default.t2[a#8L,b#9L] parquet, Statistics(sizeInBytes=1476.0 B)
      ```
   
   ### Why are the changes needed?
   
   Improve query performance.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   Unit test and TPC-DS benchmark test.
   
   SQL | Before this PR(Seconds) | After this PR(Seconds)
   -- | -- | --
   q14a | 206  | 193
   q38 | 59 | 41
   q87 | 127 | 113
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wangyum commented on a change in pull request #35779: [SPARK-36194][SQL] Add a logical plan visitor to propagate the distinct attributes

Posted by GitBox <gi...@apache.org>.
wangyum commented on a change in pull request #35779:
URL: https://github.com/apache/spark/pull/35779#discussion_r823268926



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitor.scala
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, ExpressionSet, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, LeftSemiOrAnti, RightOuter}
+
+/**
+ * A visitor pattern for traversing a [[LogicalPlan]] tree and propagate the distinct attributes.
+ */
+object DistinctKeyVisitor extends LogicalPlanVisitor[Set[ExpressionSet]] {
+
+  private def projectDistinctKeys(
+      keys: Set[ExpressionSet], projectList: Seq[NamedExpression]): Set[ExpressionSet] = {
+    val outputSet = ExpressionSet(projectList.map(_.toAttribute))
+    val distinctKeys = keys.filter(_.subsetOf(outputSet))

Review comment:
       OK




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35779: [SPARK-36194][SQL] Add a logical plan visitor to propagate the distinct attributes

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35779:
URL: https://github.com/apache/spark/pull/35779#discussion_r822385875



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitor.scala
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, ExpressionSet, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, LeftSemiOrAnti, RightOuter}
+
+/**
+ * A visitor pattern for traversing a [[LogicalPlan]] tree and propagate the distinct attributes.
+ */
+object DistinctKeyVisitor extends LogicalPlanVisitor[Set[ExpressionSet]] {
+
+  private def projectDistinctKeys(
+      keys: Set[ExpressionSet], projectList: Seq[NamedExpression]): Set[ExpressionSet] = {
+    val outputSet = ExpressionSet(projectList.map(_.toAttribute))
+    val distinctKeys = keys.filter(_.subsetOf(outputSet))
+    val aliases = projectList.filter(_.isInstanceOf[Alias])
+    if (aliases.isEmpty) {
+      distinctKeys
+    } else {
+      val aliasedDistinctKeys = keys.map { expressionSet =>
+        expressionSet.map { expression =>
+          expression transform {
+            case expr: Expression =>
+              aliases
+                .collectFirst { case a: Alias if a.child.semanticEquals(expr) => a.toAttribute }
+                .getOrElse(expr)
+          }
+        }
+      }
+      aliasedDistinctKeys.collect {
+        case es: ExpressionSet if es.subsetOf(outputSet) => ExpressionSet(es)
+      } ++ distinctKeys
+    }.filter(_.nonEmpty)
+  }
+
+  override def default(p: LogicalPlan): Set[ExpressionSet] = Set.empty[ExpressionSet]
+
+  override def visitAggregate(p: Aggregate): Set[ExpressionSet] = {
+    val groupingExps = ExpressionSet(p.groupingExpressions) // handle group by a, a
+    projectDistinctKeys(Set(groupingExps), p.aggregateExpressions)
+  }
+
+  override def visitDistinct(p: Distinct): Set[ExpressionSet] = Set(ExpressionSet(p.output))
+
+  override def visitExcept(p: Except): Set[ExpressionSet] =
+    if (!p.isAll && p.deterministic) Set(ExpressionSet(p.output)) else default(p)
+
+  override def visitExpand(p: Expand): Set[ExpressionSet] = default(p)
+
+  override def visitFilter(p: Filter): Set[ExpressionSet] = p.child.distinctKeys
+
+  override def visitGenerate(p: Generate): Set[ExpressionSet] = default(p)
+
+  override def visitGlobalLimit(p: GlobalLimit): Set[ExpressionSet] = {
+    p.maxRows match {
+      case Some(value) if value <= 1 => Set(ExpressionSet(p.output))
+      case _ => p.child.distinctKeys
+    }
+  }
+
+  override def visitIntersect(p: Intersect): Set[ExpressionSet] = {
+    if (!p.isAll && p.deterministic) Set(ExpressionSet(p.output)) else default(p)
+  }
+
+  override def visitJoin(p: Join): Set[ExpressionSet] = {
+    p match {
+      case Join(_, _, LeftSemiOrAnti(_), _, _) =>
+        p.left.distinctKeys
+      case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, _, _, left, right, _)
+          if left.distinctKeys.nonEmpty || right.distinctKeys.nonEmpty =>
+        val rightJoinKeySet = ExpressionSet(rightKeys)
+        val leftJoinKeySet = ExpressionSet(leftKeys)
+        joinType match {
+          case Inner if left.distinctKeys.exists(_.subsetOf(leftJoinKeySet)) &&
+            right.distinctKeys.exists(_.subsetOf(rightJoinKeySet)) =>
+            left.distinctKeys ++ right.distinctKeys

Review comment:
       even if only `left.distinctKeys.exists(_.subsetOf(leftJoinKeySet))` is true, I think we can still propagate `left.distinctKeys`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wangyum closed pull request #35779: [SPARK-36194][SQL] Add a logical plan visitor to propagate the distinct attributes

Posted by GitBox <gi...@apache.org>.
wangyum closed pull request #35779:
URL: https://github.com/apache/spark/pull/35779


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wangyum commented on a change in pull request #35779: [SPARK-36194][SQL] Add a logical plan visitor to propagate the distinct attributes

Posted by GitBox <gi...@apache.org>.
wangyum commented on a change in pull request #35779:
URL: https://github.com/apache/spark/pull/35779#discussion_r822636586



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitor.scala
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, ExpressionSet, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, LeftSemiOrAnti, RightOuter}
+
+/**
+ * A visitor pattern for traversing a [[LogicalPlan]] tree and propagate the distinct attributes.
+ */
+object DistinctKeyVisitor extends LogicalPlanVisitor[Set[ExpressionSet]] {
+
+  private def projectDistinctKeys(
+      keys: Set[ExpressionSet], projectList: Seq[NamedExpression]): Set[ExpressionSet] = {
+    val outputSet = ExpressionSet(projectList.map(_.toAttribute))
+    val distinctKeys = keys.filter(_.subsetOf(outputSet))

Review comment:
       Line 49 also need it: https://github.com/apache/spark/blob/a900b20c03488a53dd67594b0fd5509281f1aa72/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitor.scala#L49

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitor.scala
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, ExpressionSet, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, LeftSemiOrAnti, RightOuter}
+
+/**
+ * A visitor pattern for traversing a [[LogicalPlan]] tree and propagate the distinct attributes.
+ */
+object DistinctKeyVisitor extends LogicalPlanVisitor[Set[ExpressionSet]] {
+
+  private def projectDistinctKeys(
+      keys: Set[ExpressionSet], projectList: Seq[NamedExpression]): Set[ExpressionSet] = {
+    val outputSet = ExpressionSet(projectList.map(_.toAttribute))
+    val distinctKeys = keys.filter(_.subsetOf(outputSet))
+    val aliases = projectList.filter(_.isInstanceOf[Alias])
+    if (aliases.isEmpty) {
+      distinctKeys
+    } else {
+      val aliasedDistinctKeys = keys.map { expressionSet =>
+        expressionSet.map { expression =>
+          expression transform {
+            case expr: Expression =>
+              aliases
+                .collectFirst { case a: Alias if a.child.semanticEquals(expr) => a.toAttribute }
+                .getOrElse(expr)
+          }
+        }
+      }
+      aliasedDistinctKeys.collect {
+        case es: ExpressionSet if es.subsetOf(outputSet) => ExpressionSet(es)
+      } ++ distinctKeys
+    }.filter(_.nonEmpty)
+  }
+
+  override def default(p: LogicalPlan): Set[ExpressionSet] = Set.empty[ExpressionSet]
+
+  override def visitAggregate(p: Aggregate): Set[ExpressionSet] = {
+    val groupingExps = ExpressionSet(p.groupingExpressions) // handle group by a, a
+    projectDistinctKeys(Set(groupingExps), p.aggregateExpressions)
+  }
+
+  override def visitDistinct(p: Distinct): Set[ExpressionSet] = Set(ExpressionSet(p.output))
+
+  override def visitExcept(p: Except): Set[ExpressionSet] =
+    if (!p.isAll && p.deterministic) Set(ExpressionSet(p.output)) else default(p)

Review comment:
       Removed it.

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitor.scala
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, ExpressionSet, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, LeftSemiOrAnti, RightOuter}
+
+/**
+ * A visitor pattern for traversing a [[LogicalPlan]] tree and propagate the distinct attributes.
+ */
+object DistinctKeyVisitor extends LogicalPlanVisitor[Set[ExpressionSet]] {
+
+  private def projectDistinctKeys(
+      keys: Set[ExpressionSet], projectList: Seq[NamedExpression]): Set[ExpressionSet] = {
+    val outputSet = ExpressionSet(projectList.map(_.toAttribute))
+    val distinctKeys = keys.filter(_.subsetOf(outputSet))
+    val aliases = projectList.filter(_.isInstanceOf[Alias])
+    if (aliases.isEmpty) {
+      distinctKeys
+    } else {
+      val aliasedDistinctKeys = keys.map { expressionSet =>
+        expressionSet.map { expression =>
+          expression transform {
+            case expr: Expression =>
+              aliases
+                .collectFirst { case a: Alias if a.child.semanticEquals(expr) => a.toAttribute }
+                .getOrElse(expr)
+          }
+        }
+      }
+      aliasedDistinctKeys.collect {
+        case es: ExpressionSet if es.subsetOf(outputSet) => ExpressionSet(es)
+      } ++ distinctKeys
+    }.filter(_.nonEmpty)
+  }
+
+  override def default(p: LogicalPlan): Set[ExpressionSet] = Set.empty[ExpressionSet]
+
+  override def visitAggregate(p: Aggregate): Set[ExpressionSet] = {
+    val groupingExps = ExpressionSet(p.groupingExpressions) // handle group by a, a
+    projectDistinctKeys(Set(groupingExps), p.aggregateExpressions)
+  }
+
+  override def visitDistinct(p: Distinct): Set[ExpressionSet] = Set(ExpressionSet(p.output))
+
+  override def visitExcept(p: Except): Set[ExpressionSet] =
+    if (!p.isAll && p.deterministic) Set(ExpressionSet(p.output)) else default(p)
+
+  override def visitExpand(p: Expand): Set[ExpressionSet] = default(p)
+
+  override def visitFilter(p: Filter): Set[ExpressionSet] = p.child.distinctKeys
+
+  override def visitGenerate(p: Generate): Set[ExpressionSet] = default(p)
+
+  override def visitGlobalLimit(p: GlobalLimit): Set[ExpressionSet] = {
+    p.maxRows match {
+      case Some(value) if value <= 1 => Set(ExpressionSet(p.output))
+      case _ => p.child.distinctKeys
+    }
+  }
+
+  override def visitIntersect(p: Intersect): Set[ExpressionSet] = {
+    if (!p.isAll && p.deterministic) Set(ExpressionSet(p.output)) else default(p)

Review comment:
       Removed it.

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitor.scala
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, ExpressionSet, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, LeftSemiOrAnti, RightOuter}
+
+/**
+ * A visitor pattern for traversing a [[LogicalPlan]] tree and propagate the distinct attributes.
+ */
+object DistinctKeyVisitor extends LogicalPlanVisitor[Set[ExpressionSet]] {
+
+  private def projectDistinctKeys(
+      keys: Set[ExpressionSet], projectList: Seq[NamedExpression]): Set[ExpressionSet] = {
+    val outputSet = ExpressionSet(projectList.map(_.toAttribute))
+    val distinctKeys = keys.filter(_.subsetOf(outputSet))
+    val aliases = projectList.filter(_.isInstanceOf[Alias])
+    if (aliases.isEmpty) {
+      distinctKeys
+    } else {
+      val aliasedDistinctKeys = keys.map { expressionSet =>
+        expressionSet.map { expression =>
+          expression transform {
+            case expr: Expression =>
+              aliases
+                .collectFirst { case a: Alias if a.child.semanticEquals(expr) => a.toAttribute }
+                .getOrElse(expr)
+          }
+        }
+      }
+      aliasedDistinctKeys.collect {
+        case es: ExpressionSet if es.subsetOf(outputSet) => ExpressionSet(es)
+      } ++ distinctKeys
+    }.filter(_.nonEmpty)
+  }
+
+  override def default(p: LogicalPlan): Set[ExpressionSet] = Set.empty[ExpressionSet]
+
+  override def visitAggregate(p: Aggregate): Set[ExpressionSet] = {
+    val groupingExps = ExpressionSet(p.groupingExpressions) // handle group by a, a
+    projectDistinctKeys(Set(groupingExps), p.aggregateExpressions)
+  }
+
+  override def visitDistinct(p: Distinct): Set[ExpressionSet] = Set(ExpressionSet(p.output))
+
+  override def visitExcept(p: Except): Set[ExpressionSet] =
+    if (!p.isAll && p.deterministic) Set(ExpressionSet(p.output)) else default(p)
+
+  override def visitExpand(p: Expand): Set[ExpressionSet] = default(p)
+
+  override def visitFilter(p: Filter): Set[ExpressionSet] = p.child.distinctKeys
+
+  override def visitGenerate(p: Generate): Set[ExpressionSet] = default(p)
+
+  override def visitGlobalLimit(p: GlobalLimit): Set[ExpressionSet] = {
+    p.maxRows match {
+      case Some(value) if value <= 1 => Set(ExpressionSet(p.output))
+      case _ => p.child.distinctKeys
+    }
+  }
+
+  override def visitIntersect(p: Intersect): Set[ExpressionSet] = {
+    if (!p.isAll && p.deterministic) Set(ExpressionSet(p.output)) else default(p)
+  }
+
+  override def visitJoin(p: Join): Set[ExpressionSet] = {
+    p match {
+      case Join(_, _, LeftSemiOrAnti(_), _, _) =>
+        p.left.distinctKeys
+      case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, _, _, left, right, _)
+          if left.distinctKeys.nonEmpty || right.distinctKeys.nonEmpty =>
+        val rightJoinKeySet = ExpressionSet(rightKeys)
+        val leftJoinKeySet = ExpressionSet(leftKeys)
+        joinType match {
+          case Inner if left.distinctKeys.exists(_.subsetOf(leftJoinKeySet)) &&
+            right.distinctKeys.exists(_.subsetOf(rightJoinKeySet)) =>
+            left.distinctKeys ++ right.distinctKeys
+          case Inner | LeftOuter if right.distinctKeys.exists(_.subsetOf(rightJoinKeySet)) =>
+            p.left.distinctKeys
+          case Inner | RightOuter if left.distinctKeys.exists(_.subsetOf(leftJoinKeySet)) =>
+            p.right.distinctKeys
+          case _ =>
+            default(p)
+        }
+      case _ => default(p)
+    }
+  }
+
+  override def visitLocalLimit(p: LocalLimit): Set[ExpressionSet] = p.child.distinctKeys
+
+  override def visitPivot(p: Pivot): Set[ExpressionSet] = default(p)
+
+  override def visitProject(p: Project): Set[ExpressionSet] = {
+    if (p.child.distinctKeys.nonEmpty) {
+      projectDistinctKeys(p.child.distinctKeys.map(ExpressionSet(_)), p.projectList)

Review comment:
       Yes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35779: [SPARK-36194][SQL] Add a logical plan visitor to propagate the distinct attributes

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35779:
URL: https://github.com/apache/spark/pull/35779#discussion_r822772435



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitor.scala
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, ExpressionSet, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, LeftSemiOrAnti, RightOuter}
+
+/**
+ * A visitor pattern for traversing a [[LogicalPlan]] tree and propagate the distinct attributes.
+ */
+object DistinctKeyVisitor extends LogicalPlanVisitor[Set[ExpressionSet]] {
+
+  private def projectDistinctKeys(
+      keys: Set[ExpressionSet], projectList: Seq[NamedExpression]): Set[ExpressionSet] = {
+    val outputSet = ExpressionSet(projectList.map(_.toAttribute))
+    val distinctKeys = keys.filter(_.subsetOf(outputSet))

Review comment:
       Then do the filter again in L49. It's incorrect to do this filter this early.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wangyum commented on pull request #35779: [SPARK-36194][SQL] Add a logical plan visitor to propagate the distinct attributes

Posted by GitBox <gi...@apache.org>.
wangyum commented on pull request #35779:
URL: https://github.com/apache/spark/pull/35779#issuecomment-1063598332


   > Do we see regressions in any TPCDS queries?
   
   There is no regression.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wangyum commented on a change in pull request #35779: [SPARK-36194][SQL] Add a logical plan visitor to propagate the distinct attributes

Posted by GitBox <gi...@apache.org>.
wangyum commented on a change in pull request #35779:
URL: https://github.com/apache/spark/pull/35779#discussion_r823311918



##########
File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregatesSuite.scala
##########
@@ -153,8 +150,14 @@ class RemoveRedundantAggregatesSuite extends PlanTest {
     comparePlans(optimized, expected)
   }
 
+  test("Remove redundant aggregate - upper has contains foldable expressions") {
+    val originalQuery = x.groupBy('a, 'b)('a, 'b).groupBy('a)('a, TrueLiteral).analyze

Review comment:
       https://github.com/apache/spark/pull/35795 to address this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wangyum commented on a change in pull request #35779: [SPARK-36194][SQL] Add a logical plan visitor to propagate the distinct attributes

Posted by GitBox <gi...@apache.org>.
wangyum commented on a change in pull request #35779:
URL: https://github.com/apache/spark/pull/35779#discussion_r822654682



##########
File path: sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q38/explain.txt
##########
@@ -1,54 +1,51 @@
 == Physical Plan ==

Review comment:
       q14a/a14b reduces 1 Exchange and 2 HashAggregates.
   q38 reduces 3 Exchange and 4 HashAggregates.
   q38 reduces 3 Exchange and 4 HashAggregates.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35779: [SPARK-36194][SQL] Add a logical plan visitor to propagate the distinct attributes

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35779:
URL: https://github.com/apache/spark/pull/35779#discussion_r822374095



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala
##########
@@ -47,6 +47,17 @@ object RemoveRedundantAggregates extends Rule[LogicalPlan] with AliasHelper {
       } else {
         newAggregate
       }
+
+     case agg @ Aggregate(groupingExps, _, child)
+         if agg.groupOnly && child.deterministic &&

Review comment:
       I still don't get why `child.deterministic` is required. If the child output is completely random, then it should not report any distinct keys.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #35779: [SPARK-36194][SQL] Add a logical plan visitor to propagate the distinct attributes

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on pull request #35779:
URL: https://github.com/apache/spark/pull/35779#issuecomment-1062671217


   Do we see regressions in any TPCDS queries?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35779: [SPARK-36194][SQL] Add a logical plan visitor to propagate the distinct attributes

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35779:
URL: https://github.com/apache/spark/pull/35779#discussion_r822377275



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitor.scala
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, ExpressionSet, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, LeftSemiOrAnti, RightOuter}
+
+/**
+ * A visitor pattern for traversing a [[LogicalPlan]] tree and propagate the distinct attributes.
+ */
+object DistinctKeyVisitor extends LogicalPlanVisitor[Set[ExpressionSet]] {
+
+  private def projectDistinctKeys(
+      keys: Set[ExpressionSet], projectList: Seq[NamedExpression]): Set[ExpressionSet] = {
+    val outputSet = ExpressionSet(projectList.map(_.toAttribute))
+    val distinctKeys = keys.filter(_.subsetOf(outputSet))
+    val aliases = projectList.filter(_.isInstanceOf[Alias])
+    if (aliases.isEmpty) {
+      distinctKeys
+    } else {
+      val aliasedDistinctKeys = keys.map { expressionSet =>
+        expressionSet.map { expression =>
+          expression transform {
+            case expr: Expression =>
+              aliases
+                .collectFirst { case a: Alias if a.child.semanticEquals(expr) => a.toAttribute }

Review comment:
       can we add a TODO comment here? `// TODO: Expand distinctKeys for redundant aliases on the same expression.`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wangyum commented on a change in pull request #35779: [SPARK-36194][SQL] Add a logical plan visitor to propagate the distinct attributes

Posted by GitBox <gi...@apache.org>.
wangyum commented on a change in pull request #35779:
URL: https://github.com/apache/spark/pull/35779#discussion_r822507351



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala
##########
@@ -47,6 +47,17 @@ object RemoveRedundantAggregates extends Rule[LogicalPlan] with AliasHelper {
       } else {
         newAggregate
       }
+
+     case agg @ Aggregate(groupingExps, _, child)
+         if agg.groupOnly && child.deterministic &&
+           child.distinctKeys.exists(_.subsetOf(ExpressionSet(groupingExps))) =>
+      Project(agg.aggregateExpressions, child)
+
+    case agg @ Aggregate(groupingExps, aggregateExps, child)
+        if aggregateExps.forall(a => a.isInstanceOf[Alias] && a.children.forall(_.foldable)) &&

Review comment:
       No. `Alias` is'n foldable, but the child is foldable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wangyum commented on a change in pull request #35779: [SPARK-36194][SQL] Add a logical plan visitor to propagate the distinct attributes

Posted by GitBox <gi...@apache.org>.
wangyum commented on a change in pull request #35779:
URL: https://github.com/apache/spark/pull/35779#discussion_r822635270



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala
##########
@@ -47,6 +47,17 @@ object RemoveRedundantAggregates extends Rule[LogicalPlan] with AliasHelper {
       } else {
         newAggregate
       }
+
+     case agg @ Aggregate(groupingExps, _, child)
+         if agg.groupOnly && child.deterministic &&

Review comment:
       After thinking twice, it is not required.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35779: [SPARK-36194][SQL] Add a logical plan visitor to propagate the distinct attributes

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35779:
URL: https://github.com/apache/spark/pull/35779#discussion_r822775145



##########
File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregatesSuite.scala
##########
@@ -153,8 +150,14 @@ class RemoveRedundantAggregatesSuite extends PlanTest {
     comparePlans(optimized, expected)
   }
 
+  test("Remove redundant aggregate - upper has contains foldable expressions") {
+    val originalQuery = x.groupBy('a, 'b)('a, 'b).groupBy('a)('a, TrueLiteral).analyze

Review comment:
       ah this works because these two aggregates are adjacent. If they are not, we have a problem.
   
   I'm thinking that we should refine `Aggregate.groupOnly`: `.groupBy('a)('a, TrueLiteral)` is also group only.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35779: [SPARK-36194][SQL] Add a logical plan visitor to propagate the distinct attributes

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35779:
URL: https://github.com/apache/spark/pull/35779#discussion_r822394215



##########
File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregatesSuite.scala
##########
@@ -166,12 +169,127 @@ class RemoveRedundantAggregatesSuite extends PlanTest {
   }
 
   test("Keep non-redundant aggregate - upper references non-deterministic non-grouping") {
-    val relation = LocalRelation('a.int, 'b.int)
     val query = relation
       .groupBy('a)('a, ('a + rand(0)) as 'c)
       .groupBy('a, 'c)('a, 'c)
       .analyze
     val optimized = Optimize.execute(query)
     comparePlans(optimized, query)
   }
+
+  test("SPARK-36194: Remove aggregation from left semi/anti join if aggregation the same") {
+    Seq(LeftSemi, LeftAnti).foreach { joinType =>
+      val originalQuery = x.groupBy('a, 'b)('a, 'b)
+        .join(y, joinType, Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr))
+        .groupBy("x.a".attr, "x.b".attr)("x.a".attr, "x.b".attr)
+      val correctAnswer = x.groupBy('a, 'b)('a, 'b)
+        .join(y, joinType, Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr))
+        .select("x.a".attr, "x.b".attr)
+
+      val optimized = Optimize.execute(originalQuery.analyze)
+      comparePlans(optimized, correctAnswer.analyze)
+    }
+  }
+
+  test("SPARK-36194: Remove aggregation from left semi/anti join with alias") {
+    Seq(LeftSemi, LeftAnti).foreach { joinType =>
+      val originalQuery = x.groupBy('a, 'b)('a, 'b.as("d"))
+        .join(y, joinType, Some("x.a".attr === "y.a".attr && "d".attr === "y.b".attr))
+        .groupBy("x.a".attr, "d".attr)("x.a".attr, "d".attr)
+      val correctAnswer = x.groupBy('a, 'b)('a, 'b.as("d"))
+        .join(y, joinType, Some("x.a".attr === "y.a".attr && "d".attr === "y.b".attr))
+        .select("x.a".attr, "d".attr)
+
+      val optimized = Optimize.execute(originalQuery.analyze)
+      comparePlans(optimized, correctAnswer.analyze)
+    }
+  }
+
+  test("SPARK-36194: Remove aggregation from left semi/anti join if it is the sub aggregateExprs") {
+    Seq(LeftSemi, LeftAnti).foreach { joinType =>
+      val originalQuery = x.groupBy('a, 'b)('a, 'b)
+        .join(y, joinType, Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr))
+        .groupBy("x.a".attr, "x.b".attr)("x.a".attr)
+      val correctAnswer = x.groupBy('a, 'b)('a, 'b)
+        .join(y, joinType, Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr))
+        .select("x.a".attr)
+
+      val optimized = Optimize.execute(originalQuery.analyze)
+      comparePlans(optimized, correctAnswer.analyze)
+    }
+  }
+
+  test("SPARK-36194: Transform down to remove more aggregates") {
+    Seq(LeftSemi, LeftAnti).foreach { joinType =>
+      val originalQuery = x.groupBy('a, 'b)('a, 'b)
+        .join(y, joinType, Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr))
+        .groupBy("x.a".attr, "x.b".attr)("x.a".attr, "x.b".attr)
+        .join(y, joinType, Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr))
+        .groupBy("x.a".attr, "x.b".attr)("x.a".attr)
+      val correctAnswer = x.groupBy('a, 'b)('a, 'b)
+        .join(y, joinType, Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr))
+        .select("x.a".attr, "x.b".attr)
+        .join(y, joinType, Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr))
+        .select("x.a".attr)
+
+      val optimized = Optimize.execute(originalQuery.analyze)
+      comparePlans(optimized, correctAnswer.analyze)
+    }
+  }
+
+  test("SPARK-36194: Child distinct keys is the subset of required keys") {
+    val originalQuery = relation
+      .groupBy('a)('a, count('b).as("cnt"))
+      .groupBy('a, 'cnt)('a, 'cnt)
+      .analyze
+    val correctAnswer = relation
+      .groupBy('a)('a, count('b).as("cnt"))
+      .select('a, 'cnt)
+      .analyze
+    val optimized = Optimize.execute(originalQuery)
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("SPARK-36194: Child distinct keys are subsets and aggregateExpressions are foldable") {
+    val originalQuery = x.groupBy('a, 'b)('a, 'b)
+      .join(y, LeftSemi, Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr))
+      .groupBy("x.a".attr, "x.b".attr)(TrueLiteral)
+      .analyze
+    val correctAnswer = x.groupBy('a, 'b)('a, 'b)
+      .join(y, LeftSemi, Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr))
+      .select(TrueLiteral)
+      .analyze
+    val optimized = Optimize.execute(originalQuery)
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("SPARK-36194: Negative case: child distinct keys is not the subset of required keys") {
+    Seq(LeftSemi, LeftAnti).foreach { joinType =>
+      val originalQuery1 = x.groupBy('a, 'b)('a, 'b)
+        .join(y, joinType, Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr))
+        .groupBy("x.a".attr)("x.a".attr)
+        .analyze
+      comparePlans(Optimize.execute(originalQuery1), originalQuery1)
+
+      val originalQuery2 = x.groupBy('a, 'b)('a, 'b)
+        .join(y, joinType, Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr))
+        .groupBy("x.a".attr)(count("x.b".attr))
+        .analyze
+      comparePlans(Optimize.execute(originalQuery2), originalQuery2)
+    }
+  }
+
+  test("SPARK-36194: Negative case: child distinct keys is empty") {
+    val originalQuery = Distinct(x.groupBy('a, 'b)('a, TrueLiteral)).analyze
+    comparePlans(Optimize.execute(originalQuery), originalQuery)
+  }
+
+  test("SPARK-36194: Negative case: Remove aggregation from contains non-deterministic") {
+    val query = relation
+      .groupBy('a)('a, (count('b) + rand(0)).as("cnt"))
+      .groupBy('a, 'cnt)('a, 'cnt)

Review comment:
       what can go wrong if we optimize this case?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35779: [SPARK-36194][SQL] Add a logical plan visitor to propagate the distinct attributes

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35779:
URL: https://github.com/apache/spark/pull/35779#discussion_r824415213



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala
##########
@@ -47,6 +47,15 @@ object RemoveRedundantAggregates extends Rule[LogicalPlan] with AliasHelper {
       } else {
         newAggregate
       }
+
+    case agg @ Aggregate(groupingExps, _, child)
+        if agg.groupOnly && child.distinctKeys.exists(_.subsetOf(ExpressionSet(groupingExps))) =>
+      Project(agg.aggregateExpressions, child)
+
+    case agg @ Aggregate(groupingExps, aggregateExps, child)

Review comment:
       Do we still need this case? `agg.groupOnly` should cover it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wangyum commented on pull request #35779: [SPARK-36194][SQL] Add a logical plan visitor to propagate the distinct attributes

Posted by GitBox <gi...@apache.org>.
wangyum commented on pull request #35779:
URL: https://github.com/apache/spark/pull/35779#issuecomment-1066828603


   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wangyum closed pull request #35779: [SPARK-36194][SQL] Add a logical plan visitor to propagate the distinct attributes

Posted by GitBox <gi...@apache.org>.
wangyum closed pull request #35779:
URL: https://github.com/apache/spark/pull/35779


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35779: [SPARK-36194][SQL] Add a logical plan visitor to propagate the distinct attributes

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35779:
URL: https://github.com/apache/spark/pull/35779#discussion_r822395110



##########
File path: sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q38/explain.txt
##########
@@ -1,54 +1,51 @@
 == Physical Plan ==

Review comment:
       can you summarize the plan changes? do we have less shuffles now?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wangyum commented on a change in pull request #35779: [SPARK-36194][SQL] Add a logical plan visitor to propagate the distinct attributes

Posted by GitBox <gi...@apache.org>.
wangyum commented on a change in pull request #35779:
URL: https://github.com/apache/spark/pull/35779#discussion_r822635410



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitor.scala
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, ExpressionSet, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, LeftSemiOrAnti, RightOuter}
+
+/**
+ * A visitor pattern for traversing a [[LogicalPlan]] tree and propagate the distinct attributes.
+ */
+object DistinctKeyVisitor extends LogicalPlanVisitor[Set[ExpressionSet]] {
+
+  private def projectDistinctKeys(
+      keys: Set[ExpressionSet], projectList: Seq[NamedExpression]): Set[ExpressionSet] = {
+    val outputSet = ExpressionSet(projectList.map(_.toAttribute))
+    val distinctKeys = keys.filter(_.subsetOf(outputSet))
+    val aliases = projectList.filter(_.isInstanceOf[Alias])
+    if (aliases.isEmpty) {
+      distinctKeys
+    } else {
+      val aliasedDistinctKeys = keys.map { expressionSet =>
+        expressionSet.map { expression =>
+          expression transform {
+            case expr: Expression =>
+              aliases
+                .collectFirst { case a: Alias if a.child.semanticEquals(expr) => a.toAttribute }

Review comment:
       OK




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wangyum removed a comment on pull request #35779: [SPARK-36194][SQL] Add a logical plan visitor to propagate the distinct attributes

Posted by GitBox <gi...@apache.org>.
wangyum removed a comment on pull request #35779:
URL: https://github.com/apache/spark/pull/35779#issuecomment-1062579525


   retest this please.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wangyum commented on pull request #35779: [SPARK-36194][SQL] Add a logical plan visitor to propagate the distinct attributes

Posted by GitBox <gi...@apache.org>.
wangyum commented on pull request #35779:
URL: https://github.com/apache/spark/pull/35779#issuecomment-1062579525


   retest this please.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35779: [SPARK-36194][SQL] Add a logical plan visitor to propagate the distinct attributes

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35779:
URL: https://github.com/apache/spark/pull/35779#discussion_r822378441



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitor.scala
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, ExpressionSet, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, LeftSemiOrAnti, RightOuter}
+
+/**
+ * A visitor pattern for traversing a [[LogicalPlan]] tree and propagate the distinct attributes.
+ */
+object DistinctKeyVisitor extends LogicalPlanVisitor[Set[ExpressionSet]] {
+
+  private def projectDistinctKeys(
+      keys: Set[ExpressionSet], projectList: Seq[NamedExpression]): Set[ExpressionSet] = {
+    val outputSet = ExpressionSet(projectList.map(_.toAttribute))
+    val distinctKeys = keys.filter(_.subsetOf(outputSet))

Review comment:
       I think we should only do this filter in `if (aliases.isEmpty)`. The distinct key can be `a + b` and the project list may have `a + b AS c`, then the result distinct key should be `c`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wangyum commented on a change in pull request #35779: [SPARK-36194][SQL] Add a logical plan visitor to propagate the distinct attributes

Posted by GitBox <gi...@apache.org>.
wangyum commented on a change in pull request #35779:
URL: https://github.com/apache/spark/pull/35779#discussion_r822637528



##########
File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregatesSuite.scala
##########
@@ -153,8 +150,14 @@ class RemoveRedundantAggregatesSuite extends PlanTest {
     comparePlans(optimized, expected)
   }
 
+  test("Remove redundant aggregate - upper has contains foldable expressions") {
+    val originalQuery = x.groupBy('a, 'b)('a, 'b).groupBy('a)('a, TrueLiteral).analyze

Review comment:
       `RemoveRedundantAggregates` already supported before this PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35779: [SPARK-36194][SQL] Add a logical plan visitor to propagate the distinct attributes

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35779:
URL: https://github.com/apache/spark/pull/35779#discussion_r822375128



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala
##########
@@ -47,6 +47,17 @@ object RemoveRedundantAggregates extends Rule[LogicalPlan] with AliasHelper {
       } else {
         newAggregate
       }
+
+     case agg @ Aggregate(groupingExps, _, child)
+         if agg.groupOnly && child.deterministic &&
+           child.distinctKeys.exists(_.subsetOf(ExpressionSet(groupingExps))) =>
+      Project(agg.aggregateExpressions, child)
+
+    case agg @ Aggregate(groupingExps, aggregateExps, child)
+        if aggregateExps.forall(a => a.isInstanceOf[Alias] && a.children.forall(_.foldable)) &&

Review comment:
       isn't it just `a.foldable`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wangyum commented on a change in pull request #35779: [SPARK-36194][SQL] Add a logical plan visitor to propagate the distinct attributes

Posted by GitBox <gi...@apache.org>.
wangyum commented on a change in pull request #35779:
URL: https://github.com/apache/spark/pull/35779#discussion_r822637817



##########
File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregatesSuite.scala
##########
@@ -166,12 +169,127 @@ class RemoveRedundantAggregatesSuite extends PlanTest {
   }
 
   test("Keep non-redundant aggregate - upper references non-deterministic non-grouping") {
-    val relation = LocalRelation('a.int, 'b.int)
     val query = relation
       .groupBy('a)('a, ('a + rand(0)) as 'c)
       .groupBy('a, 'c)('a, 'c)
       .analyze
     val optimized = Optimize.execute(query)
     comparePlans(optimized, query)
   }
+
+  test("SPARK-36194: Remove aggregation from left semi/anti join if aggregation the same") {
+    Seq(LeftSemi, LeftAnti).foreach { joinType =>
+      val originalQuery = x.groupBy('a, 'b)('a, 'b)
+        .join(y, joinType, Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr))
+        .groupBy("x.a".attr, "x.b".attr)("x.a".attr, "x.b".attr)
+      val correctAnswer = x.groupBy('a, 'b)('a, 'b)
+        .join(y, joinType, Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr))
+        .select("x.a".attr, "x.b".attr)
+
+      val optimized = Optimize.execute(originalQuery.analyze)
+      comparePlans(optimized, correctAnswer.analyze)
+    }
+  }
+
+  test("SPARK-36194: Remove aggregation from left semi/anti join with alias") {
+    Seq(LeftSemi, LeftAnti).foreach { joinType =>
+      val originalQuery = x.groupBy('a, 'b)('a, 'b.as("d"))
+        .join(y, joinType, Some("x.a".attr === "y.a".attr && "d".attr === "y.b".attr))
+        .groupBy("x.a".attr, "d".attr)("x.a".attr, "d".attr)
+      val correctAnswer = x.groupBy('a, 'b)('a, 'b.as("d"))
+        .join(y, joinType, Some("x.a".attr === "y.a".attr && "d".attr === "y.b".attr))
+        .select("x.a".attr, "d".attr)
+
+      val optimized = Optimize.execute(originalQuery.analyze)
+      comparePlans(optimized, correctAnswer.analyze)
+    }
+  }
+
+  test("SPARK-36194: Remove aggregation from left semi/anti join if it is the sub aggregateExprs") {
+    Seq(LeftSemi, LeftAnti).foreach { joinType =>
+      val originalQuery = x.groupBy('a, 'b)('a, 'b)
+        .join(y, joinType, Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr))
+        .groupBy("x.a".attr, "x.b".attr)("x.a".attr)
+      val correctAnswer = x.groupBy('a, 'b)('a, 'b)
+        .join(y, joinType, Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr))
+        .select("x.a".attr)
+
+      val optimized = Optimize.execute(originalQuery.analyze)
+      comparePlans(optimized, correctAnswer.analyze)
+    }
+  }
+
+  test("SPARK-36194: Transform down to remove more aggregates") {
+    Seq(LeftSemi, LeftAnti).foreach { joinType =>
+      val originalQuery = x.groupBy('a, 'b)('a, 'b)
+        .join(y, joinType, Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr))
+        .groupBy("x.a".attr, "x.b".attr)("x.a".attr, "x.b".attr)
+        .join(y, joinType, Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr))
+        .groupBy("x.a".attr, "x.b".attr)("x.a".attr)
+      val correctAnswer = x.groupBy('a, 'b)('a, 'b)
+        .join(y, joinType, Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr))
+        .select("x.a".attr, "x.b".attr)
+        .join(y, joinType, Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr))
+        .select("x.a".attr)
+
+      val optimized = Optimize.execute(originalQuery.analyze)
+      comparePlans(optimized, correctAnswer.analyze)
+    }
+  }
+
+  test("SPARK-36194: Child distinct keys is the subset of required keys") {
+    val originalQuery = relation
+      .groupBy('a)('a, count('b).as("cnt"))
+      .groupBy('a, 'cnt)('a, 'cnt)
+      .analyze
+    val correctAnswer = relation
+      .groupBy('a)('a, count('b).as("cnt"))
+      .select('a, 'cnt)
+      .analyze
+    val optimized = Optimize.execute(originalQuery)
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("SPARK-36194: Child distinct keys are subsets and aggregateExpressions are foldable") {
+    val originalQuery = x.groupBy('a, 'b)('a, 'b)
+      .join(y, LeftSemi, Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr))
+      .groupBy("x.a".attr, "x.b".attr)(TrueLiteral)
+      .analyze
+    val correctAnswer = x.groupBy('a, 'b)('a, 'b)
+      .join(y, LeftSemi, Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr))
+      .select(TrueLiteral)
+      .analyze
+    val optimized = Optimize.execute(originalQuery)
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("SPARK-36194: Negative case: child distinct keys is not the subset of required keys") {
+    Seq(LeftSemi, LeftAnti).foreach { joinType =>
+      val originalQuery1 = x.groupBy('a, 'b)('a, 'b)
+        .join(y, joinType, Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr))
+        .groupBy("x.a".attr)("x.a".attr)
+        .analyze
+      comparePlans(Optimize.execute(originalQuery1), originalQuery1)
+
+      val originalQuery2 = x.groupBy('a, 'b)('a, 'b)
+        .join(y, joinType, Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr))
+        .groupBy("x.a".attr)(count("x.b".attr))
+        .analyze
+      comparePlans(Optimize.execute(originalQuery2), originalQuery2)
+    }
+  }
+
+  test("SPARK-36194: Negative case: child distinct keys is empty") {
+    val originalQuery = Distinct(x.groupBy('a, 'b)('a, TrueLiteral)).analyze
+    comparePlans(Optimize.execute(originalQuery), originalQuery)
+  }
+
+  test("SPARK-36194: Negative case: Remove aggregation from contains non-deterministic") {
+    val query = relation
+      .groupBy('a)('a, (count('b) + rand(0)).as("cnt"))
+      .groupBy('a, 'cnt)('a, 'cnt)

Review comment:
       We can optimize this case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35779: [SPARK-36194][SQL] Add a logical plan visitor to propagate the distinct attributes

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35779:
URL: https://github.com/apache/spark/pull/35779#discussion_r822393629



##########
File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregatesSuite.scala
##########
@@ -166,12 +169,127 @@ class RemoveRedundantAggregatesSuite extends PlanTest {
   }
 
   test("Keep non-redundant aggregate - upper references non-deterministic non-grouping") {
-    val relation = LocalRelation('a.int, 'b.int)
     val query = relation
       .groupBy('a)('a, ('a + rand(0)) as 'c)
       .groupBy('a, 'c)('a, 'c)
       .analyze
     val optimized = Optimize.execute(query)
     comparePlans(optimized, query)
   }
+
+  test("SPARK-36194: Remove aggregation from left semi/anti join if aggregation the same") {
+    Seq(LeftSemi, LeftAnti).foreach { joinType =>
+      val originalQuery = x.groupBy('a, 'b)('a, 'b)
+        .join(y, joinType, Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr))
+        .groupBy("x.a".attr, "x.b".attr)("x.a".attr, "x.b".attr)
+      val correctAnswer = x.groupBy('a, 'b)('a, 'b)
+        .join(y, joinType, Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr))
+        .select("x.a".attr, "x.b".attr)
+
+      val optimized = Optimize.execute(originalQuery.analyze)
+      comparePlans(optimized, correctAnswer.analyze)
+    }
+  }
+
+  test("SPARK-36194: Remove aggregation from left semi/anti join with alias") {
+    Seq(LeftSemi, LeftAnti).foreach { joinType =>
+      val originalQuery = x.groupBy('a, 'b)('a, 'b.as("d"))
+        .join(y, joinType, Some("x.a".attr === "y.a".attr && "d".attr === "y.b".attr))
+        .groupBy("x.a".attr, "d".attr)("x.a".attr, "d".attr)
+      val correctAnswer = x.groupBy('a, 'b)('a, 'b.as("d"))
+        .join(y, joinType, Some("x.a".attr === "y.a".attr && "d".attr === "y.b".attr))
+        .select("x.a".attr, "d".attr)
+
+      val optimized = Optimize.execute(originalQuery.analyze)
+      comparePlans(optimized, correctAnswer.analyze)
+    }
+  }
+
+  test("SPARK-36194: Remove aggregation from left semi/anti join if it is the sub aggregateExprs") {
+    Seq(LeftSemi, LeftAnti).foreach { joinType =>
+      val originalQuery = x.groupBy('a, 'b)('a, 'b)
+        .join(y, joinType, Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr))
+        .groupBy("x.a".attr, "x.b".attr)("x.a".attr)
+      val correctAnswer = x.groupBy('a, 'b)('a, 'b)
+        .join(y, joinType, Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr))
+        .select("x.a".attr)
+
+      val optimized = Optimize.execute(originalQuery.analyze)
+      comparePlans(optimized, correctAnswer.analyze)
+    }
+  }
+
+  test("SPARK-36194: Transform down to remove more aggregates") {
+    Seq(LeftSemi, LeftAnti).foreach { joinType =>
+      val originalQuery = x.groupBy('a, 'b)('a, 'b)
+        .join(y, joinType, Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr))
+        .groupBy("x.a".attr, "x.b".attr)("x.a".attr, "x.b".attr)
+        .join(y, joinType, Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr))
+        .groupBy("x.a".attr, "x.b".attr)("x.a".attr)
+      val correctAnswer = x.groupBy('a, 'b)('a, 'b)
+        .join(y, joinType, Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr))
+        .select("x.a".attr, "x.b".attr)
+        .join(y, joinType, Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr))
+        .select("x.a".attr)
+
+      val optimized = Optimize.execute(originalQuery.analyze)
+      comparePlans(optimized, correctAnswer.analyze)
+    }
+  }
+
+  test("SPARK-36194: Child distinct keys is the subset of required keys") {
+    val originalQuery = relation
+      .groupBy('a)('a, count('b).as("cnt"))
+      .groupBy('a, 'cnt)('a, 'cnt)
+      .analyze
+    val correctAnswer = relation
+      .groupBy('a)('a, count('b).as("cnt"))
+      .select('a, 'cnt)
+      .analyze
+    val optimized = Optimize.execute(originalQuery)
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("SPARK-36194: Child distinct keys are subsets and aggregateExpressions are foldable") {
+    val originalQuery = x.groupBy('a, 'b)('a, 'b)
+      .join(y, LeftSemi, Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr))
+      .groupBy("x.a".attr, "x.b".attr)(TrueLiteral)
+      .analyze
+    val correctAnswer = x.groupBy('a, 'b)('a, 'b)
+      .join(y, LeftSemi, Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr))
+      .select(TrueLiteral)
+      .analyze
+    val optimized = Optimize.execute(originalQuery)
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("SPARK-36194: Negative case: child distinct keys is not the subset of required keys") {
+    Seq(LeftSemi, LeftAnti).foreach { joinType =>
+      val originalQuery1 = x.groupBy('a, 'b)('a, 'b)
+        .join(y, joinType, Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr))
+        .groupBy("x.a".attr)("x.a".attr)
+        .analyze
+      comparePlans(Optimize.execute(originalQuery1), originalQuery1)
+
+      val originalQuery2 = x.groupBy('a, 'b)('a, 'b)
+        .join(y, joinType, Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr))
+        .groupBy("x.a".attr)(count("x.b".attr))
+        .analyze
+      comparePlans(Optimize.execute(originalQuery2), originalQuery2)
+    }
+  }
+
+  test("SPARK-36194: Negative case: child distinct keys is empty") {
+    val originalQuery = Distinct(x.groupBy('a, 'b)('a, TrueLiteral)).analyze

Review comment:
       is it empty? `a` is still the distinct key.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35779: [SPARK-36194][SQL] Add a logical plan visitor to propagate the distinct attributes

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35779:
URL: https://github.com/apache/spark/pull/35779#discussion_r822391022



##########
File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregatesSuite.scala
##########
@@ -153,8 +150,14 @@ class RemoveRedundantAggregatesSuite extends PlanTest {
     comparePlans(optimized, expected)
   }
 
+  test("Remove redundant aggregate - upper has contains foldable expressions") {
+    val originalQuery = x.groupBy('a, 'b)('a, 'b).groupBy('a)('a, TrueLiteral).analyze

Review comment:
       hmm, how does this work? `groupBy('a)('a, TrueLiteral)` is not group only and not literal-only.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35779: [SPARK-36194][SQL] Add a logical plan visitor to propagate the distinct attributes

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35779:
URL: https://github.com/apache/spark/pull/35779#discussion_r822388005



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitor.scala
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, ExpressionSet, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, LeftSemiOrAnti, RightOuter}
+
+/**
+ * A visitor pattern for traversing a [[LogicalPlan]] tree and propagate the distinct attributes.
+ */
+object DistinctKeyVisitor extends LogicalPlanVisitor[Set[ExpressionSet]] {
+
+  private def projectDistinctKeys(
+      keys: Set[ExpressionSet], projectList: Seq[NamedExpression]): Set[ExpressionSet] = {
+    val outputSet = ExpressionSet(projectList.map(_.toAttribute))
+    val distinctKeys = keys.filter(_.subsetOf(outputSet))
+    val aliases = projectList.filter(_.isInstanceOf[Alias])
+    if (aliases.isEmpty) {
+      distinctKeys
+    } else {
+      val aliasedDistinctKeys = keys.map { expressionSet =>
+        expressionSet.map { expression =>
+          expression transform {
+            case expr: Expression =>
+              aliases
+                .collectFirst { case a: Alias if a.child.semanticEquals(expr) => a.toAttribute }
+                .getOrElse(expr)
+          }
+        }
+      }
+      aliasedDistinctKeys.collect {
+        case es: ExpressionSet if es.subsetOf(outputSet) => ExpressionSet(es)
+      } ++ distinctKeys
+    }.filter(_.nonEmpty)
+  }
+
+  override def default(p: LogicalPlan): Set[ExpressionSet] = Set.empty[ExpressionSet]
+
+  override def visitAggregate(p: Aggregate): Set[ExpressionSet] = {
+    val groupingExps = ExpressionSet(p.groupingExpressions) // handle group by a, a
+    projectDistinctKeys(Set(groupingExps), p.aggregateExpressions)
+  }
+
+  override def visitDistinct(p: Distinct): Set[ExpressionSet] = Set(ExpressionSet(p.output))
+
+  override def visitExcept(p: Except): Set[ExpressionSet] =
+    if (!p.isAll && p.deterministic) Set(ExpressionSet(p.output)) else default(p)
+
+  override def visitExpand(p: Expand): Set[ExpressionSet] = default(p)
+
+  override def visitFilter(p: Filter): Set[ExpressionSet] = p.child.distinctKeys
+
+  override def visitGenerate(p: Generate): Set[ExpressionSet] = default(p)
+
+  override def visitGlobalLimit(p: GlobalLimit): Set[ExpressionSet] = {
+    p.maxRows match {
+      case Some(value) if value <= 1 => Set(ExpressionSet(p.output))
+      case _ => p.child.distinctKeys
+    }
+  }
+
+  override def visitIntersect(p: Intersect): Set[ExpressionSet] = {
+    if (!p.isAll && p.deterministic) Set(ExpressionSet(p.output)) else default(p)
+  }
+
+  override def visitJoin(p: Join): Set[ExpressionSet] = {
+    p match {
+      case Join(_, _, LeftSemiOrAnti(_), _, _) =>
+        p.left.distinctKeys
+      case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, _, _, left, right, _)
+          if left.distinctKeys.nonEmpty || right.distinctKeys.nonEmpty =>
+        val rightJoinKeySet = ExpressionSet(rightKeys)
+        val leftJoinKeySet = ExpressionSet(leftKeys)
+        joinType match {
+          case Inner if left.distinctKeys.exists(_.subsetOf(leftJoinKeySet)) &&
+            right.distinctKeys.exists(_.subsetOf(rightJoinKeySet)) =>
+            left.distinctKeys ++ right.distinctKeys
+          case Inner | LeftOuter if right.distinctKeys.exists(_.subsetOf(rightJoinKeySet)) =>
+            p.left.distinctKeys
+          case Inner | RightOuter if left.distinctKeys.exists(_.subsetOf(leftJoinKeySet)) =>
+            p.right.distinctKeys
+          case _ =>
+            default(p)
+        }
+      case _ => default(p)
+    }
+  }
+
+  override def visitLocalLimit(p: LocalLimit): Set[ExpressionSet] = p.child.distinctKeys
+
+  override def visitPivot(p: Pivot): Set[ExpressionSet] = default(p)
+
+  override def visitProject(p: Project): Set[ExpressionSet] = {
+    if (p.child.distinctKeys.nonEmpty) {
+      projectDistinctKeys(p.child.distinctKeys.map(ExpressionSet(_)), p.projectList)

Review comment:
       isn't `p.child.distinctKeys` already a `Set[ExpressionSet]`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35779: [SPARK-36194][SQL] Add a logical plan visitor to propagate the distinct attributes

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35779:
URL: https://github.com/apache/spark/pull/35779#discussion_r822381232



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitor.scala
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, ExpressionSet, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, LeftSemiOrAnti, RightOuter}
+
+/**
+ * A visitor pattern for traversing a [[LogicalPlan]] tree and propagate the distinct attributes.
+ */
+object DistinctKeyVisitor extends LogicalPlanVisitor[Set[ExpressionSet]] {
+
+  private def projectDistinctKeys(
+      keys: Set[ExpressionSet], projectList: Seq[NamedExpression]): Set[ExpressionSet] = {
+    val outputSet = ExpressionSet(projectList.map(_.toAttribute))
+    val distinctKeys = keys.filter(_.subsetOf(outputSet))
+    val aliases = projectList.filter(_.isInstanceOf[Alias])
+    if (aliases.isEmpty) {
+      distinctKeys
+    } else {
+      val aliasedDistinctKeys = keys.map { expressionSet =>
+        expressionSet.map { expression =>
+          expression transform {
+            case expr: Expression =>
+              aliases
+                .collectFirst { case a: Alias if a.child.semanticEquals(expr) => a.toAttribute }
+                .getOrElse(expr)
+          }
+        }
+      }
+      aliasedDistinctKeys.collect {
+        case es: ExpressionSet if es.subsetOf(outputSet) => ExpressionSet(es)
+      } ++ distinctKeys
+    }.filter(_.nonEmpty)
+  }
+
+  override def default(p: LogicalPlan): Set[ExpressionSet] = Set.empty[ExpressionSet]
+
+  override def visitAggregate(p: Aggregate): Set[ExpressionSet] = {
+    val groupingExps = ExpressionSet(p.groupingExpressions) // handle group by a, a
+    projectDistinctKeys(Set(groupingExps), p.aggregateExpressions)
+  }
+
+  override def visitDistinct(p: Distinct): Set[ExpressionSet] = Set(ExpressionSet(p.output))
+
+  override def visitExcept(p: Except): Set[ExpressionSet] =
+    if (!p.isAll && p.deterministic) Set(ExpressionSet(p.output)) else default(p)
+
+  override def visitExpand(p: Expand): Set[ExpressionSet] = default(p)
+
+  override def visitFilter(p: Filter): Set[ExpressionSet] = p.child.distinctKeys
+
+  override def visitGenerate(p: Generate): Set[ExpressionSet] = default(p)
+
+  override def visitGlobalLimit(p: GlobalLimit): Set[ExpressionSet] = {
+    p.maxRows match {
+      case Some(value) if value <= 1 => Set(ExpressionSet(p.output))
+      case _ => p.child.distinctKeys
+    }
+  }
+
+  override def visitIntersect(p: Intersect): Set[ExpressionSet] = {
+    if (!p.isAll && p.deterministic) Set(ExpressionSet(p.output)) else default(p)

Review comment:
       ditto




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35779: [SPARK-36194][SQL] Add a logical plan visitor to propagate the distinct attributes

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35779:
URL: https://github.com/apache/spark/pull/35779#discussion_r822381065



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitor.scala
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, ExpressionSet, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, LeftSemiOrAnti, RightOuter}
+
+/**
+ * A visitor pattern for traversing a [[LogicalPlan]] tree and propagate the distinct attributes.
+ */
+object DistinctKeyVisitor extends LogicalPlanVisitor[Set[ExpressionSet]] {
+
+  private def projectDistinctKeys(
+      keys: Set[ExpressionSet], projectList: Seq[NamedExpression]): Set[ExpressionSet] = {
+    val outputSet = ExpressionSet(projectList.map(_.toAttribute))
+    val distinctKeys = keys.filter(_.subsetOf(outputSet))
+    val aliases = projectList.filter(_.isInstanceOf[Alias])
+    if (aliases.isEmpty) {
+      distinctKeys
+    } else {
+      val aliasedDistinctKeys = keys.map { expressionSet =>
+        expressionSet.map { expression =>
+          expression transform {
+            case expr: Expression =>
+              aliases
+                .collectFirst { case a: Alias if a.child.semanticEquals(expr) => a.toAttribute }
+                .getOrElse(expr)
+          }
+        }
+      }
+      aliasedDistinctKeys.collect {
+        case es: ExpressionSet if es.subsetOf(outputSet) => ExpressionSet(es)
+      } ++ distinctKeys
+    }.filter(_.nonEmpty)
+  }
+
+  override def default(p: LogicalPlan): Set[ExpressionSet] = Set.empty[ExpressionSet]
+
+  override def visitAggregate(p: Aggregate): Set[ExpressionSet] = {
+    val groupingExps = ExpressionSet(p.groupingExpressions) // handle group by a, a
+    projectDistinctKeys(Set(groupingExps), p.aggregateExpressions)
+  }
+
+  override def visitDistinct(p: Distinct): Set[ExpressionSet] = Set(ExpressionSet(p.output))
+
+  override def visitExcept(p: Except): Set[ExpressionSet] =
+    if (!p.isAll && p.deterministic) Set(ExpressionSet(p.output)) else default(p)

Review comment:
       why `p.deterministic` is required?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wangyum commented on a change in pull request #35779: [SPARK-36194][SQL] Add a logical plan visitor to propagate the distinct attributes

Posted by GitBox <gi...@apache.org>.
wangyum commented on a change in pull request #35779:
URL: https://github.com/apache/spark/pull/35779#discussion_r824422599



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala
##########
@@ -47,6 +47,15 @@ object RemoveRedundantAggregates extends Rule[LogicalPlan] with AliasHelper {
       } else {
         newAggregate
       }
+
+    case agg @ Aggregate(groupingExps, _, child)
+        if agg.groupOnly && child.distinctKeys.exists(_.subsetOf(ExpressionSet(groupingExps))) =>
+      Project(agg.aggregateExpressions, child)
+
+    case agg @ Aggregate(groupingExps, aggregateExps, child)

Review comment:
       Yes, we do not need it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org