You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/05/13 00:25:44 UTC

[GitHub] [spark] vli-databricks opened a new pull request, #36527: [SPARK-39169][SQL] Optimize FIRST when used as a single aggregate fun…

vli-databricks opened a new pull request, #36527:
URL: https://github.com/apache/spark/pull/36527

   …ction
   
   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   Rewrite aggregate with single `FIRST` function when grouping is absent. In that case the query is equivalent to projection with limit.
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   Avoid scanning large tables.
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   No
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   Unit testing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] vli-databricks commented on a diff in pull request #36527: [SPARK-39169][SQL] Optimize FIRST when used as a single aggregate fun…

Posted by GitBox <gi...@apache.org>.
vli-databricks commented on code in PR #36527:
URL: https://github.com/apache/spark/pull/36527#discussion_r872918218


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RewriteNonAggregateFirstSuite.scala:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.aggregate.First
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical._
+
+class RewriteNonAggregateFirstSuite extends PlanTest {
+  val testRelation: LocalRelation = LocalRelation('a.string, 'b.string)

Review Comment:
   Fixed, thank you for pointing this out!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] vli-databricks commented on a diff in pull request #36527: [SPARK-39169][SQL] Optimize FIRST when used as a single aggregate fun…

Posted by GitBox <gi...@apache.org>.
vli-databricks commented on code in PR #36527:
URL: https://github.com/apache/spark/pull/36527#discussion_r872664126


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -196,7 +196,8 @@ abstract class Optimizer(catalogManager: CatalogManager)
       ReplaceDeduplicateWithAggregate) ::
     Batch("Aggregate", fixedPoint,
       RemoveLiteralFromGroupExpressions,
-      RemoveRepetitionFromGroupExpressions) :: Nil ++
+      RemoveRepetitionFromGroupExpressions,
+      RewriteNonAggregateFirst) :: Nil ++

Review Comment:
   No particular reason, it seems to be a fitting batch for aggregate optimization.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dtenedor commented on a diff in pull request #36527: [SPARK-39169][SQL] Optimize FIRST when used as a single aggregate fun…

Posted by GitBox <gi...@apache.org>.
dtenedor commented on code in PR #36527:
URL: https://github.com/apache/spark/pull/36527#discussion_r872694267


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteNonAggregateFirst.scala:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, First}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.FIRST
+import org.apache.spark.sql.types.IntegerType
+
+/**
+ * Rewrite aggregate plan with a single [[First]] function when grouping is absent. In such a case
+ * the query is equivalent to simple projection with limit 1.
+ *
+ * Input Pseudo-Query:
+ * {{{
+ *   SELECT FIRST(col) FROM table
+ * }}}
+ *
+ * Rewritten Query:
+ * {{{
+ *   SELECT col FROM table LIMIT 1

Review Comment:
   Good question, yes, this optimization only covers cases of one or more `FIRST` aggregate functions and only when `IGNORE NULLS` is not present, and no other aggregate functions are present. For other cases of the `FIRST` aggregate function, we plan to update the query execution to stop consuming input values after receiving the first one when possible (the current implementation consumes all the remaining values and discards them).
   
   The advantage of generating a logical `LIMIT 1` in this case is that the optimizer can recursively push it down. This can result in more efficient execution since e.g. data source scans can prune partitions or do other improvements outside of the aggregation operator itself.
   
   Re: complexity tradeoff vs. performance improvement, please leave opinions if needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #36527: [SPARK-39169][SQL] Optimize FIRST when used as a single aggregate fun…

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on PR #36527:
URL: https://github.com/apache/spark/pull/36527#issuecomment-1126524461

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] vli-databricks commented on a diff in pull request #36527: [SPARK-39169][SQL] Optimize FIRST when used as a single aggregate fun…

Posted by GitBox <gi...@apache.org>.
vli-databricks commented on code in PR #36527:
URL: https://github.com/apache/spark/pull/36527#discussion_r872918534


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RewriteNonAggregateFirstSuite.scala:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.aggregate.First
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical._
+
+class RewriteNonAggregateFirstSuite extends PlanTest {
+  val testRelation: LocalRelation = LocalRelation('a.string, 'b.string)
+
+  private def checkRewrite(rewrite: LogicalPlan): Unit = rewrite match {
+    case GlobalLimit(_, LocalLimit(_, Project(_, _))) =>
+    case _ => fail(s"Plan is not rewritten:\n$rewrite")
+  }
+
+  test("no group by and single first aggregate") {
+    val input = testRelation.select(first('a)).analyze
+    val rewrite = RewriteNonAggregateFirst(input)
+    checkRewrite(rewrite)
+  }
+
+  test("single first aggregate with group by") {
+    val input = testRelation
+      .groupBy('a)(first('b))
+      .analyze
+    val rewrite = RewriteNonAggregateFirst(input)
+    comparePlans(input, rewrite)
+  }
+
+  test("multiple aggregates with group by") {
+    val input = testRelation
+      .groupBy('a)(
+        first('a').as('agg1),
+        max('b).as('agg2))
+      .analyze
+    val rewrite = RewriteNonAggregateFirst(input)
+    comparePlans(input, rewrite)
+  }
+
+  test("multiple aggregates without group by") {
+    val input = testRelation
+      .select(first('a).as('agg1), max('b).as('agg2))
+      .analyze
+    val rewrite = RewriteNonAggregateFirst(input)
+    comparePlans(input, rewrite)
+  }
+
+  test("no group by and single first aggregate with ignore nulls") {
+    val input = testRelation
+      .select(First('a, ignoreNulls = true).toAggregateExpression())

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dtenedor commented on a diff in pull request #36527: [SPARK-39169][SQL] Optimize FIRST when used as a single aggregate fun…

Posted by GitBox <gi...@apache.org>.
dtenedor commented on code in PR #36527:
URL: https://github.com/apache/spark/pull/36527#discussion_r871913208


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -196,7 +196,8 @@ abstract class Optimizer(catalogManager: CatalogManager)
       ReplaceDeduplicateWithAggregate) ::
     Batch("Aggregate", fixedPoint,
       RemoveLiteralFromGroupExpressions,
-      RemoveRepetitionFromGroupExpressions) :: Nil ++
+      RemoveRepetitionFromGroupExpressions,
+      RewriteNonAggregateFirst) :: Nil ++

Review Comment:
   Just curious, is there a reason we elect to put the new rule here vs. some other location in the optimizer rule batches?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteNonAggregateFirst.scala:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, First}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.FIRST
+import org.apache.spark.sql.types.IntegerType
+
+/**
+ * Rewrite aggregate plan with a single [[First]] function when grouping is absent. In such a case
+ * the query is equivalent to simple projection with limit 1.
+ *
+ * Input Pseudo-Query:
+ * {{{
+ *   SELECT FIRST(col) FROM table
+ * }}}
+ *
+ * Rewritten Query:
+ * {{{
+ *   SELECT col FROM table LIMIT 1
+ * }}}
+ *
+ * Note that using IGNORE NULLS with [[First]] blocks rewrite logic since projection with NOT NULL
+ * filter might return different result than [[First]] if all values are NULL.
+ */
+object RewriteNonAggregateFirst extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.transformWithPruning(_.containsPattern(FIRST), ruleId) {
+      case agg: Aggregate if isNonAggregateFirst(agg) =>
+        rewriteNonAggregateFirstToProject(agg)
+    }
+  }
+
+  private def isNonAggregateFirst(agg: Aggregate): Boolean = {

Review Comment:
   We only call this method once and the implementation is relatively simple...maybe we just inline it at the call site instead?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteNonAggregateFirst.scala:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, First}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.FIRST
+import org.apache.spark.sql.types.IntegerType
+
+/**
+ * Rewrite aggregate plan with a single [[First]] function when grouping is absent. In such a case
+ * the query is equivalent to simple projection with limit 1.
+ *
+ * Input Pseudo-Query:
+ * {{{
+ *   SELECT FIRST(col) FROM table
+ * }}}
+ *
+ * Rewritten Query:
+ * {{{
+ *   SELECT col FROM table LIMIT 1
+ * }}}
+ *
+ * Note that using IGNORE NULLS with [[First]] blocks rewrite logic since projection with NOT NULL
+ * filter might return different result than [[First]] if all values are NULL.
+ */
+object RewriteNonAggregateFirst extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.transformWithPruning(_.containsPattern(FIRST), ruleId) {
+      case agg: Aggregate if isNonAggregateFirst(agg) =>
+        rewriteNonAggregateFirstToProject(agg)
+    }
+  }
+
+  private def isNonAggregateFirst(agg: Aggregate): Boolean = {
+    agg.aggregateExpressions.length == 1 && agg.groupingExpressions.isEmpty
+  }
+
+  private def rewriteNonAggregateFirstToProject(agg: Aggregate): LogicalPlan = {
+    var skipRewrite = false
+    val projectList = agg.aggregateExpressions.map {
+      case alias: Alias =>

Review Comment:
   I think we could simplify this by just matching against the Alias <- First sequence in one go, e.g. `case a@Alias(f: First, _) if !f.ignoreNulls => Some(a.copy(child = f.child))` and then catch all other cases with `case _ => None`. Then at the end you can check `if (projectList.forall(_.isDefined) { ... projectList.map(_.get) } else { agg }`. 



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteNonAggregateFirst.scala:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, First}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.FIRST
+import org.apache.spark.sql.types.IntegerType
+
+/**
+ * Rewrite aggregate plan with a single [[First]] function when grouping is absent. In such a case
+ * the query is equivalent to simple projection with limit 1.
+ *
+ * Input Pseudo-Query:
+ * {{{
+ *   SELECT FIRST(col) FROM table
+ * }}}
+ *
+ * Rewritten Query:
+ * {{{
+ *   SELECT col FROM table LIMIT 1
+ * }}}
+ *
+ * Note that using IGNORE NULLS with [[First]] blocks rewrite logic since projection with NOT NULL
+ * filter might return different result than [[First]] if all values are NULL.
+ */
+object RewriteNonAggregateFirst extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.transformWithPruning(_.containsPattern(FIRST), ruleId) {
+      case agg: Aggregate if isNonAggregateFirst(agg) =>
+        rewriteNonAggregateFirstToProject(agg)
+    }
+  }
+
+  private def isNonAggregateFirst(agg: Aggregate): Boolean = {
+    agg.aggregateExpressions.length == 1 && agg.groupingExpressions.isEmpty
+  }
+
+  private def rewriteNonAggregateFirstToProject(agg: Aggregate): LogicalPlan = {

Review Comment:
   same here, we can probably just put all the code in the `apply` method.



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RewriteNonAggregateFirstSuite.scala:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.aggregate.First
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical._
+
+class RewriteNonAggregateFirstSuite extends PlanTest {
+  val testRelation: LocalRelation = LocalRelation('a.string, 'b.string)
+
+  private def checkRewrite(rewrite: LogicalPlan): Unit = rewrite match {
+    case GlobalLimit(_, LocalLimit(_, Project(_, _))) =>
+    case _ => fail(s"Plan is not rewritten:\n$rewrite")
+  }
+
+  test("no group by and single first aggregate") {
+    val input = testRelation.select(first('a)).analyze
+    val rewrite = RewriteNonAggregateFirst(input)
+    checkRewrite(rewrite)
+  }
+
+  test("single first aggregate with group by") {
+    val input = testRelation
+      .groupBy('a)(first('b))
+      .analyze
+    val rewrite = RewriteNonAggregateFirst(input)
+    comparePlans(input, rewrite)
+  }
+
+  test("multiple aggregates with group by") {
+    val input = testRelation
+      .groupBy('a)(
+        first('a').as('agg1),
+        max('b).as('agg2))
+      .analyze
+    val rewrite = RewriteNonAggregateFirst(input)
+    comparePlans(input, rewrite)
+  }
+
+  test("multiple aggregates without group by") {
+    val input = testRelation
+      .select(first('a).as('agg1), max('b).as('agg2))
+      .analyze
+    val rewrite = RewriteNonAggregateFirst(input)
+    comparePlans(input, rewrite)
+  }
+
+  test("no group by and single first aggregate with ignore nulls") {
+    val input = testRelation
+      .select(First('a, ignoreNulls = true).toAggregateExpression())

Review Comment:
   add another test with multiple First aggregates (to show the optimization applies), plus one with multiple First aggregates plus one non-First aggregate (to show the optimization does not apply)?



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RewriteNonAggregateFirstSuite.scala:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.aggregate.First
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical._
+
+class RewriteNonAggregateFirstSuite extends PlanTest {
+  val testRelation: LocalRelation = LocalRelation('a.string, 'b.string)

Review Comment:
   Symbol literals are banned by the databricks style guide: https://github.com/databricks/scala-style-guide#symbol-literals



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteNonAggregateFirst.scala:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, First}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.FIRST
+import org.apache.spark.sql.types.IntegerType
+
+/**
+ * Rewrite aggregate plan with a single [[First]] function when grouping is absent. In such a case

Review Comment:
   We can apply this optimization if there are more than one FIRST aggregate function, correct? We just can't have any other aggregate functions. Maybe update the comment and example to mention this, and update the rule implementation to allow this case as well, plus a couple unit tests with multiple FIRST aggregate functions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #36527: [SPARK-39169][SQL] Optimize FIRST when used as a single aggregate fun…

Posted by GitBox <gi...@apache.org>.
amaliujia commented on code in PR #36527:
URL: https://github.com/apache/spark/pull/36527#discussion_r872692808


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteNonAggregateFirst.scala:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, First}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.FIRST
+import org.apache.spark.sql.types.IntegerType
+
+/**
+ * Rewrite aggregate plan with a single [[First]] function when grouping is absent. In such a case
+ * the query is equivalent to simple projection with limit 1.
+ *
+ * Input Pseudo-Query:
+ * {{{
+ *   SELECT FIRST(col) FROM table
+ * }}}
+ *
+ * Rewritten Query:
+ * {{{
+ *   SELECT col FROM table LIMIT 1

Review Comment:
   Another way to think that is the rule will be triggered on every query while only a very small set of query will gain the benefit?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] vli-databricks closed pull request #36527: [SPARK-39169][SQL] Optimize FIRST when used as a single aggregate fun…

Posted by GitBox <gi...@apache.org>.
vli-databricks closed pull request #36527: [SPARK-39169][SQL] Optimize FIRST when used as a single aggregate fun…
URL: https://github.com/apache/spark/pull/36527


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #36527: [SPARK-39169][SQL] Optimize FIRST when used as a single aggregate fun…

Posted by GitBox <gi...@apache.org>.
amaliujia commented on code in PR #36527:
URL: https://github.com/apache/spark/pull/36527#discussion_r872703441


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteNonAggregateFirst.scala:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, First}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.FIRST
+import org.apache.spark.sql.types.IntegerType
+
+/**
+ * Rewrite aggregate plan with a single [[First]] function when grouping is absent. In such a case
+ * the query is equivalent to simple projection with limit 1.
+ *
+ * Input Pseudo-Query:
+ * {{{
+ *   SELECT FIRST(col) FROM table
+ * }}}
+ *
+ * Rewritten Query:
+ * {{{
+ *   SELECT col FROM table LIMIT 1

Review Comment:
   Thanks for the insight about potential gain on the performance improvement!
   
   I see the tradeoff better now. I don't have strong opinion which one is better. Some systems are more sensible for query compilation latency.
   
   For the common workload that Spark SQL deals with, I believe it does not care about compilation latency much but is more favor of faster execution on large data set. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] vli-databricks commented on a diff in pull request #36527: [SPARK-39169][SQL] Optimize FIRST when used as a single aggregate fun…

Posted by GitBox <gi...@apache.org>.
vli-databricks commented on code in PR #36527:
URL: https://github.com/apache/spark/pull/36527#discussion_r872918052


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteNonAggregateFirst.scala:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, First}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.FIRST
+import org.apache.spark.sql.types.IntegerType
+
+/**
+ * Rewrite aggregate plan with a single [[First]] function when grouping is absent. In such a case

Review Comment:
   Yes, thanks for catching this. Fixed comment and added tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] vli-databricks commented on a diff in pull request #36527: [SPARK-39169][SQL] Optimize FIRST when used as a single aggregate fun…

Posted by GitBox <gi...@apache.org>.
vli-databricks commented on code in PR #36527:
URL: https://github.com/apache/spark/pull/36527#discussion_r872917991


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteNonAggregateFirst.scala:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, First}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.FIRST
+import org.apache.spark.sql.types.IntegerType
+
+/**
+ * Rewrite aggregate plan with a single [[First]] function when grouping is absent. In such a case
+ * the query is equivalent to simple projection with limit 1.
+ *
+ * Input Pseudo-Query:
+ * {{{
+ *   SELECT FIRST(col) FROM table
+ * }}}
+ *
+ * Rewritten Query:
+ * {{{
+ *   SELECT col FROM table LIMIT 1
+ * }}}
+ *
+ * Note that using IGNORE NULLS with [[First]] blocks rewrite logic since projection with NOT NULL
+ * filter might return different result than [[First]] if all values are NULL.
+ */
+object RewriteNonAggregateFirst extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.transformWithPruning(_.containsPattern(FIRST), ruleId) {
+      case agg: Aggregate if isNonAggregateFirst(agg) =>
+        rewriteNonAggregateFirstToProject(agg)
+    }
+  }
+
+  private def isNonAggregateFirst(agg: Aggregate): Boolean = {

Review Comment:
   It is slightly expanded to include multiple `First` and `ignoreNulls`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #36527: [SPARK-39169][SQL] Optimize FIRST when used as a single aggregate fun…

Posted by GitBox <gi...@apache.org>.
amaliujia commented on code in PR #36527:
URL: https://github.com/apache/spark/pull/36527#discussion_r872703441


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteNonAggregateFirst.scala:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, First}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.FIRST
+import org.apache.spark.sql.types.IntegerType
+
+/**
+ * Rewrite aggregate plan with a single [[First]] function when grouping is absent. In such a case
+ * the query is equivalent to simple projection with limit 1.
+ *
+ * Input Pseudo-Query:
+ * {{{
+ *   SELECT FIRST(col) FROM table
+ * }}}
+ *
+ * Rewritten Query:
+ * {{{
+ *   SELECT col FROM table LIMIT 1

Review Comment:
   Thanks for the insight about potential gain on the performance improvement!
   
   I see the tradeoff better now. I don't have strong opinion which one is better. Some systems are more sensible for query compilation latency.
   
   For the common workload that Spark SQL deals with, I believe it does not care about compilation latency much but more favor of faster execution on large data set. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dtenedor commented on a diff in pull request #36527: [SPARK-39169][SQL] Optimize FIRST when used as a single aggregate fun…

Posted by GitBox <gi...@apache.org>.
dtenedor commented on code in PR #36527:
URL: https://github.com/apache/spark/pull/36527#discussion_r872694267


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteNonAggregateFirst.scala:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, First}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.FIRST
+import org.apache.spark.sql.types.IntegerType
+
+/**
+ * Rewrite aggregate plan with a single [[First]] function when grouping is absent. In such a case
+ * the query is equivalent to simple projection with limit 1.
+ *
+ * Input Pseudo-Query:
+ * {{{
+ *   SELECT FIRST(col) FROM table
+ * }}}
+ *
+ * Rewritten Query:
+ * {{{
+ *   SELECT col FROM table LIMIT 1

Review Comment:
   Good question, yes, this optimization only covers cases of one or more `FIRST` aggregate functions and only when `IGNORE NULLS` is not present, and no other aggregate functions are present. For other cases of the `FIRST` aggregate function, we plan to update the query execution to stop consuming input values after receiving the first one when possible (the current implementation consumes all the remaining values and discards them).
   
   The advantage of generating a logical `LIMIT 1` in this case is that the optimizer can recursively push it down. This can result in more efficient execution since e.g. data source scans can prune partitions or do other improvements outside of the aggregation operator itself.
   
   Re: complexity tradeoff vs. performance improvement, please leave opinions if needed. (Note that pruning on `_.containsPattern(FIRST)` will skip the rule iteration unless the query plan contains the FIRST aggregate function.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dtenedor commented on a diff in pull request #36527: [SPARK-39169][SQL] Optimize FIRST when used as a single aggregate fun…

Posted by GitBox <gi...@apache.org>.
dtenedor commented on code in PR #36527:
URL: https://github.com/apache/spark/pull/36527#discussion_r872705567


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteNonAggregateFirst.scala:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, First}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.FIRST
+import org.apache.spark.sql.types.IntegerType
+
+/**
+ * Rewrite aggregate plan with a single [[First]] function when grouping is absent. In such a case
+ * the query is equivalent to simple projection with limit 1.
+ *
+ * Input Pseudo-Query:
+ * {{{
+ *   SELECT FIRST(col) FROM table
+ * }}}
+ *
+ * Rewritten Query:
+ * {{{
+ *   SELECT col FROM table LIMIT 1

Review Comment:
   Yeah, the optimization triggers in a small subset of cases. So we don't want to add a lot of complexity or compilation time. With the ideas suggested in this review, I think we can simplify the code for the rule implementation. And the pruning takes place in O(1) with bitmaps so it should not impose any compilation time unless the query actually contains FIRST aggregate function(s), and even then, very little.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dtenedor commented on a diff in pull request #36527: [SPARK-39169][SQL] Optimize FIRST when used as a single aggregate fun…

Posted by GitBox <gi...@apache.org>.
dtenedor commented on code in PR #36527:
URL: https://github.com/apache/spark/pull/36527#discussion_r872705861


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteNonAggregateFirst.scala:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, First}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.FIRST
+import org.apache.spark.sql.types.IntegerType
+
+/**
+ * Rewrite aggregate plan with a single [[First]] function when grouping is absent. In such a case
+ * the query is equivalent to simple projection with limit 1.
+ *
+ * Input Pseudo-Query:
+ * {{{
+ *   SELECT FIRST(col) FROM table
+ * }}}
+ *
+ * Rewritten Query:
+ * {{{
+ *   SELECT col FROM table LIMIT 1

Review Comment:
   Yeah, the optimization triggers in a small subset of cases. So we don't want to add a lot of complexity or compilation time. With the ideas suggested in this review, I think we can simplify the code for the rule implementation. And the pruning takes place in O(1) with bitmaps so it should not impose any compilation time unless the query actually contains FIRST aggregate function(s), and even then, very little.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #36527: [SPARK-39169][SQL] Optimize FIRST when used as a single aggregate fun…

Posted by GitBox <gi...@apache.org>.
amaliujia commented on code in PR #36527:
URL: https://github.com/apache/spark/pull/36527#discussion_r872703441


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteNonAggregateFirst.scala:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, First}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.FIRST
+import org.apache.spark.sql.types.IntegerType
+
+/**
+ * Rewrite aggregate plan with a single [[First]] function when grouping is absent. In such a case
+ * the query is equivalent to simple projection with limit 1.
+ *
+ * Input Pseudo-Query:
+ * {{{
+ *   SELECT FIRST(col) FROM table
+ * }}}
+ *
+ * Rewritten Query:
+ * {{{
+ *   SELECT col FROM table LIMIT 1

Review Comment:
   Thanks for the insight about potential gain on the performance improvement!
   
   I see the tradeoff better now. I don't have strong opinion which one is better. Some systems are more sensible for query compilation latency.
   
   For the common workload that Spark SQL deals with, I believe it does not care about compilation latency much but is more favor of fast execution on large data set. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #36527: [SPARK-39169][SQL] Optimize FIRST when used as a single aggregate fun…

Posted by GitBox <gi...@apache.org>.
amaliujia commented on code in PR #36527:
URL: https://github.com/apache/spark/pull/36527#discussion_r872689693


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteNonAggregateFirst.scala:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, First}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.FIRST
+import org.apache.spark.sql.types.IntegerType
+
+/**
+ * Rewrite aggregate plan with a single [[First]] function when grouping is absent. In such a case
+ * the query is equivalent to simple projection with limit 1.
+ *
+ * Input Pseudo-Query:
+ * {{{
+ *   SELECT FIRST(col) FROM table
+ * }}}
+ *
+ * Rewritten Query:
+ * {{{
+ *   SELECT col FROM table LIMIT 1

Review Comment:
   Why we want this optimization? Is the purpose to implement one less function? Are there other similar aggregation function that can apply the same idea?
   
   My concern is adding a rule to optimization process will increase the complexity of SQL compiler engine if the benefit is only to save implementation of a single aggregate function.  Conceptually having a aggregate function implemented  is much simpler. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dtenedor commented on a diff in pull request #36527: [SPARK-39169][SQL] Optimize FIRST when used as a single aggregate fun…

Posted by GitBox <gi...@apache.org>.
dtenedor commented on code in PR #36527:
URL: https://github.com/apache/spark/pull/36527#discussion_r873944972


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RewriteNonAggregateFirstSuite.scala:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Max}
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical._
+
+class RewriteNonAggregateFirstSuite extends PlanTest {
+  val testRelation: LocalRelation = LocalRelation($"a".string, $"b".string)
+
+  private def checkRewrite(rewrite: LogicalPlan): Unit = rewrite match {
+    case Aggregate(_, _, GlobalLimit(_, _)) =>
+    case _ => fail(s"Plan is not rewritten:\n$rewrite")
+  }
+
+  test("single FIRST aggregate and no group by") {
+    val input = testRelation.select(
+      First($"a", ignoreNulls = false).toAggregateExpression()).analyze
+    val rewrite = RewriteNonAggregateFirst(input.analyze)
+    checkRewrite(rewrite)
+  }
+
+  test("multiple FIRST aggregates and no group by") {
+    val input = testRelation.select(
+      First($"a", ignoreNulls = false).toAggregateExpression(),
+      First($"b", ignoreNulls = false).toAggregateExpression()).analyze
+    val rewrite = RewriteNonAggregateFirst(input.analyze)
+    checkRewrite(rewrite)
+  }
+
+  test("ignoreNulls set to true blocks rewrite") {
+    val input = testRelation.select(
+      First($"a", ignoreNulls = false).toAggregateExpression(),
+      First($"b", ignoreNulls = true).toAggregateExpression()).analyze
+    val rewrite = RewriteNonAggregateFirst(input.analyze)
+    comparePlans(input, rewrite)
+  }
+
+  test("FIRST aggregate with group by") {
+    val input = testRelation
+      .groupBy($"a")(First($"a", ignoreNulls = false).toAggregateExpression())
+      .analyze
+    val rewrite = RewriteNonAggregateFirst(input)
+    comparePlans(input, rewrite)
+  }
+
+  test("mixed aggregates with group by") {
+    val input = testRelation
+      .groupBy('a)(
+        First($"a", ignoreNulls = false).toAggregateExpression().as('agg1),
+        Max($"b").toAggregateExpression().as('agg2))
+      .analyze
+    val rewrite = RewriteNonAggregateFirst(input)
+    comparePlans(input, rewrite)
+  }
+
+  test("mixed aggregates without group by") {

Review Comment:
   Let's also add a test case like the one Bart described, where the input is empty?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #36527: [SPARK-39169][SQL] Optimize FIRST when used as a single aggregate fun…

Posted by GitBox <gi...@apache.org>.
amaliujia commented on code in PR #36527:
URL: https://github.com/apache/spark/pull/36527#discussion_r872708265


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteNonAggregateFirst.scala:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, First}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.FIRST
+import org.apache.spark.sql.types.IntegerType
+
+/**
+ * Rewrite aggregate plan with a single [[First]] function when grouping is absent. In such a case
+ * the query is equivalent to simple projection with limit 1.
+ *
+ * Input Pseudo-Query:
+ * {{{
+ *   SELECT FIRST(col) FROM table
+ * }}}
+ *
+ * Rewritten Query:
+ * {{{
+ *   SELECT col FROM table LIMIT 1

Review Comment:
   I didn't know there is a way to do O(1) pruning. In that case this is not a concern.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] vli-databricks commented on a diff in pull request #36527: [SPARK-39169][SQL] Optimize FIRST when used as a single aggregate fun…

Posted by GitBox <gi...@apache.org>.
vli-databricks commented on code in PR #36527:
URL: https://github.com/apache/spark/pull/36527#discussion_r872918076


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteNonAggregateFirst.scala:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, First}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.FIRST
+import org.apache.spark.sql.types.IntegerType
+
+/**
+ * Rewrite aggregate plan with a single [[First]] function when grouping is absent. In such a case
+ * the query is equivalent to simple projection with limit 1.
+ *
+ * Input Pseudo-Query:
+ * {{{
+ *   SELECT FIRST(col) FROM table
+ * }}}
+ *
+ * Rewritten Query:
+ * {{{
+ *   SELECT col FROM table LIMIT 1
+ * }}}
+ *
+ * Note that using IGNORE NULLS with [[First]] blocks rewrite logic since projection with NOT NULL
+ * filter might return different result than [[First]] if all values are NULL.
+ */
+object RewriteNonAggregateFirst extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.transformWithPruning(_.containsPattern(FIRST), ruleId) {
+      case agg: Aggregate if isNonAggregateFirst(agg) =>
+        rewriteNonAggregateFirstToProject(agg)
+    }
+  }
+
+  private def isNonAggregateFirst(agg: Aggregate): Boolean = {
+    agg.aggregateExpressions.length == 1 && agg.groupingExpressions.isEmpty
+  }
+
+  private def rewriteNonAggregateFirstToProject(agg: Aggregate): LogicalPlan = {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #36527: [SPARK-39169][SQL] Optimize FIRST when used as a single aggregate fun…

Posted by GitBox <gi...@apache.org>.
amaliujia commented on code in PR #36527:
URL: https://github.com/apache/spark/pull/36527#discussion_r872706458


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RewriteNonAggregateFirstSuite.scala:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.aggregate.First
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical._
+
+class RewriteNonAggregateFirstSuite extends PlanTest {
+  val testRelation: LocalRelation = LocalRelation('a.string, 'b.string)
+
+  private def checkRewrite(rewrite: LogicalPlan): Unit = rewrite match {
+    case GlobalLimit(_, LocalLimit(_, Project(_, _))) =>
+    case _ => fail(s"Plan is not rewritten:\n$rewrite")
+  }
+
+  test("no group by and single first aggregate") {
+    val input = testRelation.select(first('a)).analyze
+    val rewrite = RewriteNonAggregateFirst(input)
+    checkRewrite(rewrite)
+  }
+
+  test("single first aggregate with group by") {
+    val input = testRelation
+      .groupBy('a)(first('b))
+      .analyze
+    val rewrite = RewriteNonAggregateFirst(input)
+    comparePlans(input, rewrite)
+  }
+
+  test("multiple aggregates with group by") {
+    val input = testRelation
+      .groupBy('a)(
+        first('a').as('agg1),
+        max('b).as('agg2))
+      .analyze
+    val rewrite = RewriteNonAggregateFirst(input)
+    comparePlans(input, rewrite)
+  }
+
+  test("multiple aggregates without group by") {
+    val input = testRelation
+      .select(first('a).as('agg1), max('b).as('agg2))
+      .analyze
+    val rewrite = RewriteNonAggregateFirst(input)
+    comparePlans(input, rewrite)
+  }
+
+  test("no group by and single first aggregate with ignore nulls") {
+    val input = testRelation
+      .select(First('a, ignoreNulls = true).toAggregateExpression())

Review Comment:
   +1 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] vli-databricks commented on a diff in pull request #36527: [SPARK-39169][SQL] Optimize FIRST when used as a single aggregate fun…

Posted by GitBox <gi...@apache.org>.
vli-databricks commented on code in PR #36527:
URL: https://github.com/apache/spark/pull/36527#discussion_r872918192


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteNonAggregateFirst.scala:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, First}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.FIRST
+import org.apache.spark.sql.types.IntegerType
+
+/**
+ * Rewrite aggregate plan with a single [[First]] function when grouping is absent. In such a case
+ * the query is equivalent to simple projection with limit 1.
+ *
+ * Input Pseudo-Query:
+ * {{{
+ *   SELECT FIRST(col) FROM table
+ * }}}
+ *
+ * Rewritten Query:
+ * {{{
+ *   SELECT col FROM table LIMIT 1
+ * }}}
+ *
+ * Note that using IGNORE NULLS with [[First]] blocks rewrite logic since projection with NOT NULL
+ * filter might return different result than [[First]] if all values are NULL.
+ */
+object RewriteNonAggregateFirst extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.transformWithPruning(_.containsPattern(FIRST), ruleId) {
+      case agg: Aggregate if isNonAggregateFirst(agg) =>
+        rewriteNonAggregateFirstToProject(agg)
+    }
+  }
+
+  private def isNonAggregateFirst(agg: Aggregate): Boolean = {
+    agg.aggregateExpressions.length == 1 && agg.groupingExpressions.isEmpty
+  }
+
+  private def rewriteNonAggregateFirstToProject(agg: Aggregate): LogicalPlan = {
+    var skipRewrite = false
+    val projectList = agg.aggregateExpressions.map {
+      case alias: Alias =>

Review Comment:
   redid since the logic changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] vli-databricks commented on a diff in pull request #36527: [SPARK-39169][SQL] Optimize FIRST when used as a single aggregate fun…

Posted by GitBox <gi...@apache.org>.
vli-databricks commented on code in PR #36527:
URL: https://github.com/apache/spark/pull/36527#discussion_r874028670


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RewriteNonAggregateFirstSuite.scala:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Max}
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical._
+
+class RewriteNonAggregateFirstSuite extends PlanTest {
+  val testRelation: LocalRelation = LocalRelation($"a".string, $"b".string)
+
+  private def checkRewrite(rewrite: LogicalPlan): Unit = rewrite match {
+    case Aggregate(_, _, GlobalLimit(_, _)) =>
+    case _ => fail(s"Plan is not rewritten:\n$rewrite")
+  }
+
+  test("single FIRST aggregate and no group by") {
+    val input = testRelation.select(
+      First($"a", ignoreNulls = false).toAggregateExpression()).analyze
+    val rewrite = RewriteNonAggregateFirst(input.analyze)
+    checkRewrite(rewrite)
+  }
+
+  test("multiple FIRST aggregates and no group by") {
+    val input = testRelation.select(
+      First($"a", ignoreNulls = false).toAggregateExpression(),
+      First($"b", ignoreNulls = false).toAggregateExpression()).analyze
+    val rewrite = RewriteNonAggregateFirst(input.analyze)
+    checkRewrite(rewrite)
+  }
+
+  test("ignoreNulls set to true blocks rewrite") {
+    val input = testRelation.select(
+      First($"a", ignoreNulls = false).toAggregateExpression(),
+      First($"b", ignoreNulls = true).toAggregateExpression()).analyze
+    val rewrite = RewriteNonAggregateFirst(input.analyze)
+    comparePlans(input, rewrite)
+  }
+
+  test("FIRST aggregate with group by") {
+    val input = testRelation
+      .groupBy($"a")(First($"a", ignoreNulls = false).toAggregateExpression())
+      .analyze
+    val rewrite = RewriteNonAggregateFirst(input)
+    comparePlans(input, rewrite)
+  }
+
+  test("mixed aggregates with group by") {
+    val input = testRelation
+      .groupBy('a)(
+        First($"a", ignoreNulls = false).toAggregateExpression().as('agg1),
+        Max($"b").toAggregateExpression().as('agg2))
+      .analyze
+    val rewrite = RewriteNonAggregateFirst(input)
+    comparePlans(input, rewrite)
+  }
+
+  test("mixed aggregates without group by") {

Review Comment:
   added in `misc-aggregate.sql`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org