You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "zml1206 (via GitHub)" <gi...@apache.org> on 2023/09/27 08:37:17 UTC

[GitHub] [spark] zml1206 opened a new pull request, #43144: [SPARK-45352][SQL] Remove window partition if partition expressions are foldable

zml1206 opened a new pull request, #43144:
URL: https://github.com/apache/spark/pull/43144

   ### What changes were proposed in this pull request?
   This PR add a new optimizer rule `OptimizeWindowPartitions`, it remove window partition if partition expressions are foldable.
   Sql1:
   `select row_number() over(oder by a) b from t `
   Sql2:
   `select row_number() over(partition by 1 oder by a) b from t `
   After this PR, the `optimizedPlan` for sql1 and sql2 is the same.
   
   ### Why are the changes needed?
   Foldable partition is redundant, remove it not only can simplify plan, but some rules can also take effect when the partitions are all foldable, such as `InferWindowGroupLimit`.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   UT
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45352][SQL] Eliminate foldable window partitions [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #43144:
URL: https://github.com/apache/spark/pull/43144#discussion_r1437087368


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFramesSuite.scala:
##########
@@ -548,4 +548,19 @@ class DataFrameWindowFramesSuite extends QueryTest with SharedSparkSession {
       df,
       Row(1) :: Row(1) :: Nil)
   }
+
+  test("SPARK-45352: Remove window partition if partition expression are foldable") {
+    val ds = Seq((1, 1), (1, 2), (1, 3), (2, 1), (2, 2)).toDF("n", "i")
+    val sortOrder = SortOrder($"i".expr, Ascending)
+    val window1 = new WindowSpec(Seq(), Seq(sortOrder), UnspecifiedFrame)
+    val window2 = new WindowSpec(Seq(lit(1).expr), Seq(sortOrder), UnspecifiedFrame)
+    val df1 = ds.select(row_number().over(window1).alias("num"))
+    val df2 = ds.select(row_number().over(window2).alias("num"))

Review Comment:
   Updated, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45352][SQL] Remove window partition if partition expressions are foldable [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #43144:
URL: https://github.com/apache/spark/pull/43144#discussion_r1349455263


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -1241,6 +1242,23 @@ object OptimizeRepartition extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Remove window partition if partition expressions are foldable
+ */
+object EliminateWindowPartitions extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
+    _.containsPattern(WINDOW), ruleId) {
+      case w @ Window(we, ps, _, _) if ps.exists(_.foldable) =>
+        val newWe = we.map {
+          _.transform {
+            case wsc @ WindowSpecDefinition(_ps, _, _) if _ps.exists(_.foldable) =>
+              wsc.copy(partitionSpec = _ps.filter(!_.foldable))
+          }.asInstanceOf[NamedExpression]
+        }
+        w.copy(windowExpressions = newWe, partitionSpec = ps.filter(!_.foldable))

Review Comment:
   I haven't actually built it, just for reference. You should ensure that it is correct.
   
   Could we make the variable name more readable? e.g. `we` => `windowExprs` or `windowExpressions` and others.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #43144: [SPARK-45352][SQL] Remove window partition if partition expressions are foldable

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #43144:
URL: https://github.com/apache/spark/pull/43144#discussion_r1338435883


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -1241,6 +1242,23 @@ object OptimizeRepartition extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Remove window partition if partition expressions are foldable
+ */
+object OptimizeWindowPartitions extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = removeWindowExpressionPartitions(plan)
+    .transformWithPruning(_.containsPattern(WINDOW), ruleId) {
+      case w @ Window(_, ps, _, _) =>
+        w.copy(partitionSpec = ps.filter(!_.foldable))
+    }
+
+  def removeWindowExpressionPartitions(plan: LogicalPlan): LogicalPlan =
+    plan.transformAllExpressionsWithPruning(_.containsAnyPattern(WINDOW_EXPRESSION), ruleId) {

Review Comment:
   ```suggestion
       plan.transformAllExpressionsWithPruning(_.containsPattern(WINDOW_EXPRESSION), ruleId) {
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -1241,6 +1242,23 @@ object OptimizeRepartition extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Remove window partition if partition expressions are foldable
+ */
+object OptimizeWindowPartitions extends Rule[LogicalPlan] {

Review Comment:
   `OptimizeWindowPartitions` -> `EliminateWindowPartitions`



##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFramesSuite.scala:
##########
@@ -513,4 +513,14 @@ class DataFrameWindowFramesSuite extends QueryTest with SharedSparkSession {
       df,
       Row(1) :: Row(1) :: Nil)
   }
+
+  test("SPARK-45352: Remove window partition if partition expression are foldable") {
+    val ds = Seq(1, 2, 3).toDF("i")
+    val sortOrder = SortOrder($"i".expr, Ascending)
+    val window1 = new WindowSpec(Seq(), Seq(sortOrder), UnspecifiedFrame)
+    val window2 = new WindowSpec(Seq(lit(1).expr), Seq(sortOrder), UnspecifiedFrame)

Review Comment:
   Please add more test cases:
   
   - partitions only unfoldable.
   - Mix unfoldable and foldable partition specs.
   



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -1241,6 +1242,23 @@ object OptimizeRepartition extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Remove window partition if partition expressions are foldable
+ */
+object OptimizeWindowPartitions extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = removeWindowExpressionPartitions(plan)
+    .transformWithPruning(_.containsPattern(WINDOW), ruleId) {
+      case w @ Window(_, ps, _, _) =>
+        w.copy(partitionSpec = ps.filter(!_.foldable))
+    }
+
+  def removeWindowExpressionPartitions(plan: LogicalPlan): LogicalPlan =
+    plan.transformAllExpressionsWithPruning(_.containsAnyPattern(WINDOW_EXPRESSION), ruleId) {
+      case we @ WindowExpression(_, ws @ WindowSpecDefinition(ps, _, _)) =>

Review Comment:
   ```suggestion
         case we @ WindowExpression(_, ws @ WindowSpecDefinition(ps, _, _)) if ps.exists(_.foldable) =>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45352][SQL] Eliminate foldable window partitions [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #43144:
URL: https://github.com/apache/spark/pull/43144#discussion_r1440353552


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateWindowPartitionsSuite.scala:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules._
+
+class EliminateWindowPartitionsSuite extends PlanTest {
+
+  private object Optimize extends RuleExecutor[LogicalPlan] {
+    val batches =
+      Batch("Eliminate window partitions", FixedPoint(20),
+        EliminateWindowPartitions) :: Nil
+  }
+
+  val testRelation = LocalRelation($"a".int, $"b".int)
+  private val a = testRelation.output(0)
+  private val b = testRelation.output(1)
+  private val windowFrame = SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow)
+
+  test("Remove foldable window partitions") {
+    val originalQuery =
+      testRelation
+        .select(a, b,
+          windowExpr(RowNumber(),
+            windowSpec(Literal(1) :: Nil, b.desc :: Nil, windowFrame)).as("rn"))
+
+    val correctAnswer =
+      testRelation
+        .select(a, b,
+          windowExpr(RowNumber(),
+            windowSpec(Nil, b.desc :: Nil, windowFrame)).as("rn"))

Review Comment:
   Yes. All the data rows in a single partition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45352][SQL] Remove window partition if partition expressions are foldable [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #43144:
URL: https://github.com/apache/spark/pull/43144#discussion_r1349453303


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -1241,6 +1242,23 @@ object OptimizeRepartition extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Remove window partition if partition expressions are foldable
+ */
+object EliminateWindowPartitions extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
+    _.containsPattern(WINDOW), ruleId) {
+      case w @ Window(we, ps, _, _) if ps.exists(_.foldable) =>
+        val newWe = we.map {
+          _.transform {
+            case wsc @ WindowSpecDefinition(_ps, _, _) if _ps.exists(_.foldable) =>
+              wsc.copy(partitionSpec = _ps.filter(!_.foldable))
+          }.asInstanceOf[NamedExpression]
+        }
+        w.copy(windowExpressions = newWe, partitionSpec = ps.filter(!_.foldable))

Review Comment:
   Should this be done? `WINDOW_EXPRESSION` mismatch `WindowSpecDefinition`.
   ```
         case w @ Window(we, ps, _, _) if ps.exists(_.foldable) =>
           val newWe = we.map(_.transformWithPruning(_.containsPattern(WINDOW_EXPRESSION)) {
             case _we @ WindowExpression(_, wsd @ WindowSpecDefinition(_ps, _, _))
               if _ps.exists(_.foldable) =>
               val newWsd = wsd.copy(partitionSpec = _ps.filter(!_.foldable))
               _we.copy(windowSpec = newWsd)
             }.asInstanceOf[NamedExpression])
           w.copy(windowExpressions = newWe, partitionSpec = ps.filter(!_.foldable))
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45352][SQL] Eliminate foldable window partitions [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #43144:
URL: https://github.com/apache/spark/pull/43144#discussion_r1377013427


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -1241,6 +1242,24 @@ object OptimizeRepartition extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Remove window partition if partition expressions are foldable
+ */
+object EliminateWindowPartitions extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
+    _.containsPattern(WINDOW), ruleId) {
+    case w @ Window(windowExprs, partitionSpec, _, _) if partitionSpec.exists(_.foldable) =>
+      val newWindowExprs = windowExprs.map(_.transformWithPruning(
+        _.containsPattern(WINDOW_EXPRESSION)) {
+        case windowExpr @ WindowExpression(_, wsd @ WindowSpecDefinition(ps, _, _))
+          if ps.exists(_.foldable) =>
+          val newWsd = wsd.copy(partitionSpec = ps.filter(!_.foldable))
+          windowExpr.copy(windowSpec = newWsd)

Review Comment:
   Which object uses `withNewChildren`? What is the purpose of using `withNewChildren`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zml1206 commented on a diff in pull request #43144: [SPARK-45352][SQL] Remove window partition if partition expressions are foldable

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #43144:
URL: https://github.com/apache/spark/pull/43144#discussion_r1338463707


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFramesSuite.scala:
##########
@@ -513,4 +513,14 @@ class DataFrameWindowFramesSuite extends QueryTest with SharedSparkSession {
       df,
       Row(1) :: Row(1) :: Nil)
   }
+
+  test("SPARK-45352: Remove window partition if partition expression are foldable") {
+    val ds = Seq(1, 2, 3).toDF("i")
+    val sortOrder = SortOrder($"i".expr, Ascending)
+    val window1 = new WindowSpec(Seq(), Seq(sortOrder), UnspecifiedFrame)
+    val window2 = new WindowSpec(Seq(lit(1).expr), Seq(sortOrder), UnspecifiedFrame)

Review Comment:
   @beliefer Thanks, updated all.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45352][SQL] Eliminate foldable window partitions [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43144:
URL: https://github.com/apache/spark/pull/43144#discussion_r1440109131


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFramesSuite.scala:
##########
@@ -548,4 +548,18 @@ class DataFrameWindowFramesSuite extends QueryTest with SharedSparkSession {
       df,
       Row(1) :: Row(1) :: Nil)
   }
+
+  test("SPARK-45352: Eliminate foldable window partitions") {

Review Comment:
   shall we turn on/off the new optimizer rule and run the test twice to make sure the result is the same?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45352][SQL] Remove window partition if partition expressions are foldable [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #43144:
URL: https://github.com/apache/spark/pull/43144#discussion_r1346345573


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -1241,6 +1242,23 @@ object OptimizeRepartition extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Remove window partition if partition expressions are foldable
+ */
+object EliminateWindowPartitions extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = removeWindowExpressionPartitions(plan)
+    .transformWithPruning(_.containsPattern(WINDOW), ruleId) {
+      case w @ Window(_, ps, _, _) if ps.exists(_.foldable) =>
+        w.copy(partitionSpec = ps.filter(!_.foldable))
+    }
+
+  def removeWindowExpressionPartitions(plan: LogicalPlan): LogicalPlan =
+    plan.transformAllExpressionsWithPruning(_.containsPattern(WINDOW_EXPRESSION), ruleId) {
+      case we @ WindowExpression(_, ws @ WindowSpecDefinition(ps, _, _)) if ps.exists(_.foldable) =>
+        we.copy(windowSpec = ws.copy(partitionSpec = ps.filter(!_.foldable)))
+    }
+}

Review Comment:
   If  do as you say, this will cause the `partitionSpec` in `WindowExpression` to be inconsistent with the `partitionSpec` in `Window`, which may cause hidden dangers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45352][SQL] Remove window partition if partition expressions are foldable [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #43144:
URL: https://github.com/apache/spark/pull/43144#discussion_r1349468313


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -1241,6 +1242,23 @@ object OptimizeRepartition extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Remove window partition if partition expressions are foldable
+ */
+object EliminateWindowPartitions extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
+    _.containsPattern(WINDOW), ruleId) {
+    case w @ Window(windowExprs, partitionSpec, _, _) if partitionSpec.exists(_.foldable) =>
+      val newWe = windowExprs.map(_.transformWithPruning(_.containsPattern(WINDOW_EXPRESSION)) {

Review Comment:
   Uh, I'm missing. Please rename `newWe` to `newWindowExprs`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45352][SQL] Remove window partition if partition expressions are foldable [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #43144:
URL: https://github.com/apache/spark/pull/43144#discussion_r1349442662


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -1241,6 +1242,23 @@ object OptimizeRepartition extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Remove window partition if partition expressions are foldable
+ */
+object EliminateWindowPartitions extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
+    _.containsPattern(WINDOW), ruleId) {
+      case w @ Window(we, ps, _, _) if ps.exists(_.foldable) =>
+        val newWe = we.map {
+          _.transform {
+            case wsc @ WindowSpecDefinition(_ps, _, _) if _ps.exists(_.foldable) =>
+              wsc.copy(partitionSpec = _ps.filter(!_.foldable))
+          }.asInstanceOf[NamedExpression]
+        }
+        w.copy(windowExpressions = newWe, partitionSpec = ps.filter(!_.foldable))

Review Comment:
   ```
         case w @ Window(windowExpressions, partitionSpec, _, _) if partitionSpec.exists(_.foldable) =>
           val newWindowExpressions =
             windowExpressions.map(_.transformWithPruning(_.containsPattern(WINDOW_EXPRESSION)) {
               case wsd @ WindowSpecDefinition(ps, _, _) if ps.exists(_.foldable) =>
                 wsd.copy(partitionSpec = ps.filter(!_.foldable))
             }.asInstanceOf[NamedExpression])
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45352][SQL] Remove window partition if partition expressions are foldable [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #43144:
URL: https://github.com/apache/spark/pull/43144#discussion_r1349469364


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -1241,6 +1242,23 @@ object OptimizeRepartition extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Remove window partition if partition expressions are foldable
+ */
+object EliminateWindowPartitions extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
+    _.containsPattern(WINDOW), ruleId) {
+    case w @ Window(windowExprs, partitionSpec, _, _) if partitionSpec.exists(_.foldable) =>
+      val newWe = windowExprs.map(_.transformWithPruning(_.containsPattern(WINDOW_EXPRESSION)) {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45352][SQL] Eliminate foldable window partitions [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43144:
URL: https://github.com/apache/spark/pull/43144#discussion_r1440107589


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/EliminateWindowPartitions.scala:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{NamedExpression, WindowExpression, WindowSpecDefinition}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Window}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.{WINDOW, WINDOW_EXPRESSION}
+
+/**
+ * Remove window partition if partition expressions are foldable.
+ */
+object EliminateWindowPartitions extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
+    _.containsPattern(WINDOW), ruleId) {
+    case w @ Window(windowExprs, partitionSpec, _, _) if partitionSpec.exists(_.foldable) =>
+      val newWindowExprs = windowExprs.map(_.transformWithPruning(
+        _.containsPattern(WINDOW_EXPRESSION)) {
+        case windowExpr @ WindowExpression(_, wsd @ WindowSpecDefinition(ps, _, _))
+          if ps.exists(_.foldable) =>
+          val newWsd = wsd.copy(partitionSpec = ps.filter(!_.foldable))
+          windowExpr.withNewChildren(Seq(windowExpr.windowFunction, newWsd))

Review Comment:
   nit: `windowExpr.copy(windowSpec = newWsd)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45352][SQL] Remove window partition if partition expressions are foldable [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #43144:
URL: https://github.com/apache/spark/pull/43144#discussion_r1347438713


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -1241,6 +1242,23 @@ object OptimizeRepartition extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Remove window partition if partition expressions are foldable
+ */
+object EliminateWindowPartitions extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = removeWindowExpressionPartitions(plan)
+    .transformWithPruning(_.containsPattern(WINDOW), ruleId) {
+      case w @ Window(_, ps, _, _) if ps.exists(_.foldable) =>
+        w.copy(partitionSpec = ps.filter(!_.foldable))
+    }
+
+  def removeWindowExpressionPartitions(plan: LogicalPlan): LogicalPlan =
+    plan.transformAllExpressionsWithPruning(_.containsPattern(WINDOW_EXPRESSION), ruleId) {
+      case we @ WindowExpression(_, ws @ WindowSpecDefinition(ps, _, _)) if ps.exists(_.foldable) =>
+        we.copy(windowSpec = ws.copy(partitionSpec = ps.filter(!_.foldable)))
+    }
+}

Review Comment:
   I mean is merge the logic of `removeWindowExpressionPartitions` into `transformWithPruning`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45352][SQL] Remove window partition if partition expressions are foldable [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #43144:
URL: https://github.com/apache/spark/pull/43144#discussion_r1347614906


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -1241,6 +1242,23 @@ object OptimizeRepartition extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Remove window partition if partition expressions are foldable
+ */
+object EliminateWindowPartitions extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = removeWindowExpressionPartitions(plan)
+    .transformWithPruning(_.containsPattern(WINDOW), ruleId) {
+      case w @ Window(_, ps, _, _) if ps.exists(_.foldable) =>
+        w.copy(partitionSpec = ps.filter(!_.foldable))
+    }
+
+  def removeWindowExpressionPartitions(plan: LogicalPlan): LogicalPlan =
+    plan.transformAllExpressionsWithPruning(_.containsPattern(WINDOW_EXPRESSION), ruleId) {
+      case we @ WindowExpression(_, ws @ WindowSpecDefinition(ps, _, _)) if ps.exists(_.foldable) =>
+        we.copy(windowSpec = ws.copy(partitionSpec = ps.filter(!_.foldable)))
+    }
+}

Review Comment:
   Ok, done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45352][SQL] Eliminate foldable window partitions [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on PR #43144:
URL: https://github.com/apache/spark/pull/43144#issuecomment-1880631838

   Thank you for review. @cloud-fan @beliefer 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45352][SQL] Eliminate foldable window partitions [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43144:
URL: https://github.com/apache/spark/pull/43144#discussion_r1437005120


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -1251,6 +1252,24 @@ object OptimizeRepartition extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Remove window partition if partition expressions are foldable
+ */
+object EliminateWindowPartitions extends Rule[LogicalPlan] {

Review Comment:
   nit: let's put it in a new file



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45352][SQL] Eliminate foldable window partitions [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43144:
URL: https://github.com/apache/spark/pull/43144#discussion_r1440108568


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateWindowPartitionsSuite.scala:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules._
+
+class EliminateWindowPartitionsSuite extends PlanTest {
+
+  private object Optimize extends RuleExecutor[LogicalPlan] {
+    val batches =
+      Batch("Eliminate window partitions", FixedPoint(20),
+        EliminateWindowPartitions) :: Nil
+  }
+
+  val testRelation = LocalRelation($"a".int, $"b".int)
+  private val a = testRelation.output(0)
+  private val b = testRelation.output(1)
+  private val windowFrame = SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow)
+
+  test("Remove foldable window partitions") {
+    val originalQuery =
+      testRelation
+        .select(a, b,
+          windowExpr(RowNumber(),
+            windowSpec(Literal(1) :: Nil, b.desc :: Nil, windowFrame)).as("rn"))
+
+    val correctAnswer =
+      testRelation
+        .select(a, b,
+          windowExpr(RowNumber(),
+            windowSpec(Nil, b.desc :: Nil, windowFrame)).as("rn"))
+    comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze)
+  }
+
+  test("Remove part of window partitions which is foldable") {
+    val originalQuery =
+      testRelation
+        .select(a, b,
+          windowExpr(RowNumber(),
+            windowSpec(a :: Literal(1) :: Nil, b.desc :: Nil, windowFrame)).as("rn"))
+
+    val correctAnswer =
+      testRelation
+        .select(a, b,
+          windowExpr(RowNumber(),
+            windowSpec(a :: Nil, b.desc :: Nil, windowFrame)).as("rn"))
+    comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze)
+  }
+
+  test("Can't remove non-foldable window partitions") {
+    val originalQuery =
+      testRelation
+        .select(a, b,
+          windowExpr(RowNumber(),
+            windowSpec(a :: Nil, b.desc :: Nil, windowFrame)).as("rn"))
+
+    val correctAnswer =

Review Comment:
   nit: `val correctAnswer = originalQuery`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45352][SQL] Remove window partition if partition expressions are foldable [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #43144:
URL: https://github.com/apache/spark/pull/43144#discussion_r1347438713


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -1241,6 +1242,23 @@ object OptimizeRepartition extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Remove window partition if partition expressions are foldable
+ */
+object EliminateWindowPartitions extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = removeWindowExpressionPartitions(plan)
+    .transformWithPruning(_.containsPattern(WINDOW), ruleId) {
+      case w @ Window(_, ps, _, _) if ps.exists(_.foldable) =>
+        w.copy(partitionSpec = ps.filter(!_.foldable))
+    }
+
+  def removeWindowExpressionPartitions(plan: LogicalPlan): LogicalPlan =
+    plan.transformAllExpressionsWithPruning(_.containsPattern(WINDOW_EXPRESSION), ruleId) {
+      case we @ WindowExpression(_, ws @ WindowSpecDefinition(ps, _, _)) if ps.exists(_.foldable) =>
+        we.copy(windowSpec = ws.copy(partitionSpec = ps.filter(!_.foldable)))
+    }
+}

Review Comment:
   What I mean is merge the logic of `removeWindowExpressionPartitions` into `transformWithPruning`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45352][SQL] Remove window partition if partition expressions are foldable [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #43144:
URL: https://github.com/apache/spark/pull/43144#discussion_r1349453303


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -1241,6 +1242,23 @@ object OptimizeRepartition extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Remove window partition if partition expressions are foldable
+ */
+object EliminateWindowPartitions extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
+    _.containsPattern(WINDOW), ruleId) {
+      case w @ Window(we, ps, _, _) if ps.exists(_.foldable) =>
+        val newWe = we.map {
+          _.transform {
+            case wsc @ WindowSpecDefinition(_ps, _, _) if _ps.exists(_.foldable) =>
+              wsc.copy(partitionSpec = _ps.filter(!_.foldable))
+          }.asInstanceOf[NamedExpression]
+        }
+        w.copy(windowExpressions = newWe, partitionSpec = ps.filter(!_.foldable))

Review Comment:
   Should this be done? `WINDOW_EXPRESSION` mismatch `WindowSpecDefinition`.
   ```
   case w @ Window(we, ps, _, _) if ps.exists(_.foldable) =>
           val newWe = we.map(_.transformWithPruning(_.containsPattern(WINDOW_EXPRESSION)) {
             case _we @ WindowExpression(_, wsd @ WindowSpecDefinition(_ps, _, _))
               if _ps.exists(_.foldable) =>
               val newWsd = wsd.copy(partitionSpec = _ps.filter(!_.foldable))
               _we.copy(windowSpec = newWsd)
             }.asInstanceOf[NamedExpression])
           w.copy(windowExpressions = newWe, partitionSpec = ps.filter(!_.foldable))
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45352][SQL] Remove window partition if partition expressions are foldable [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #43144:
URL: https://github.com/apache/spark/pull/43144#discussion_r1349456332


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -1241,6 +1242,23 @@ object OptimizeRepartition extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Remove window partition if partition expressions are foldable
+ */
+object EliminateWindowPartitions extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
+    _.containsPattern(WINDOW), ruleId) {
+      case w @ Window(we, ps, _, _) if ps.exists(_.foldable) =>
+        val newWe = we.map {
+          _.transform {
+            case wsc @ WindowSpecDefinition(_ps, _, _) if _ps.exists(_.foldable) =>
+              wsc.copy(partitionSpec = _ps.filter(!_.foldable))
+          }.asInstanceOf[NamedExpression]
+        }
+        w.copy(windowExpressions = newWe, partitionSpec = ps.filter(!_.foldable))

Review Comment:
   I adjusted the above code
   ```
         case w @ Window(windowExprs, partitionSpec, _, _) if partitionSpec.exists(_.foldable) =>
           val newWe = windowExprs.map(_.transformWithPruning(_.containsPattern(WINDOW_EXPRESSION)) {
             case windowExpr @ WindowExpression(_, wsd @ WindowSpecDefinition(ps, _, _))
               if ps.exists(_.foldable) =>
               val newWsd = wsd.copy(partitionSpec = ps.filter(!_.foldable))
               windowExpr.copy(windowSpec = newWsd)
           }.asInstanceOf[NamedExpression])
           w.copy(windowExpressions = newWe, partitionSpec = partitionSpec.filter(!_.foldable))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45352][SQL] Eliminate foldable window partitions [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #43144:
URL: https://github.com/apache/spark/pull/43144#issuecomment-1880625597

   thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45352][SQL] Eliminate foldable window partitions [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #43144:
URL: https://github.com/apache/spark/pull/43144#discussion_r1437087242


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFramesSuite.scala:
##########
@@ -548,4 +548,19 @@ class DataFrameWindowFramesSuite extends QueryTest with SharedSparkSession {
       df,
       Row(1) :: Row(1) :: Nil)
   }
+
+  test("SPARK-45352: Remove window partition if partition expression are foldable") {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45352][SQL] Eliminate foldable window partitions [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43144:
URL: https://github.com/apache/spark/pull/43144#discussion_r1437006206


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFramesSuite.scala:
##########
@@ -548,4 +548,19 @@ class DataFrameWindowFramesSuite extends QueryTest with SharedSparkSession {
       df,
       Row(1) :: Row(1) :: Nil)
   }
+
+  test("SPARK-45352: Remove window partition if partition expression are foldable") {

Review Comment:
   for a new optimizer rule, we should add unit tests, like `LimitPushdownSuite`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45352][SQL] Eliminate foldable window partitions [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on PR #43144:
URL: https://github.com/apache/spark/pull/43144#issuecomment-1875093495

   Updated all, CI runs completed, but the page display is still running. @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45352][SQL] Remove window partition if partition expressions are foldable [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #43144:
URL: https://github.com/apache/spark/pull/43144#discussion_r1349460457


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -1241,6 +1242,23 @@ object OptimizeRepartition extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Remove window partition if partition expressions are foldable
+ */
+object EliminateWindowPartitions extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
+    _.containsPattern(WINDOW), ruleId) {
+      case w @ Window(we, ps, _, _) if ps.exists(_.foldable) =>
+        val newWe = we.map {
+          _.transform {
+            case wsc @ WindowSpecDefinition(_ps, _, _) if _ps.exists(_.foldable) =>
+              wsc.copy(partitionSpec = _ps.filter(!_.foldable))
+          }.asInstanceOf[NamedExpression]
+        }
+        w.copy(windowExpressions = newWe, partitionSpec = ps.filter(!_.foldable))

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45352][SQL] Remove window partition if partition expressions are foldable [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #43144:
URL: https://github.com/apache/spark/pull/43144#discussion_r1345515920


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -1241,6 +1242,23 @@ object OptimizeRepartition extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Remove window partition if partition expressions are foldable
+ */
+object EliminateWindowPartitions extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = removeWindowExpressionPartitions(plan)
+    .transformWithPruning(_.containsPattern(WINDOW), ruleId) {
+      case w @ Window(_, ps, _, _) if ps.exists(_.foldable) =>
+        w.copy(partitionSpec = ps.filter(!_.foldable))
+    }
+
+  def removeWindowExpressionPartitions(plan: LogicalPlan): LogicalPlan =
+    plan.transformAllExpressionsWithPruning(_.containsPattern(WINDOW_EXPRESSION), ruleId) {
+      case we @ WindowExpression(_, ws @ WindowSpecDefinition(ps, _, _)) if ps.exists(_.foldable) =>
+        we.copy(windowSpec = ws.copy(partitionSpec = ps.filter(!_.foldable)))
+    }
+}

Review Comment:
   It seems we can simplify the code here with `transformWithPruning` only, then we can remove the `removeWindowExpressionPartitions`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45352][SQL] Eliminate foldable window partitions [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on PR #43144:
URL: https://github.com/apache/spark/pull/43144#issuecomment-1784359968

   cc @wangyum 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45352][SQL] Eliminate foldable window partitions [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #43144:
URL: https://github.com/apache/spark/pull/43144#discussion_r1437087042


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -1251,6 +1252,24 @@ object OptimizeRepartition extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Remove window partition if partition expressions are foldable
+ */
+object EliminateWindowPartitions extends Rule[LogicalPlan] {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45352][SQL] Eliminate foldable window partitions [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on code in PR #43144:
URL: https://github.com/apache/spark/pull/43144#discussion_r1436460406


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -1241,6 +1242,24 @@ object OptimizeRepartition extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Remove window partition if partition expressions are foldable
+ */
+object EliminateWindowPartitions extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
+    _.containsPattern(WINDOW), ruleId) {
+    case w @ Window(windowExprs, partitionSpec, _, _) if partitionSpec.exists(_.foldable) =>
+      val newWindowExprs = windowExprs.map(_.transformWithPruning(
+        _.containsPattern(WINDOW_EXPRESSION)) {
+        case windowExpr @ WindowExpression(_, wsd @ WindowSpecDefinition(ps, _, _))
+          if ps.exists(_.foldable) =>
+          val newWsd = wsd.copy(partitionSpec = ps.filter(!_.foldable))
+          windowExpr.copy(windowSpec = newWsd)

Review Comment:
   Updated, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45352][SQL] Eliminate foldable window partitions [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43144:
URL: https://github.com/apache/spark/pull/43144#discussion_r1437007194


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFramesSuite.scala:
##########
@@ -548,4 +548,19 @@ class DataFrameWindowFramesSuite extends QueryTest with SharedSparkSession {
       df,
       Row(1) :: Row(1) :: Nil)
   }
+
+  test("SPARK-45352: Remove window partition if partition expression are foldable") {
+    val ds = Seq((1, 1), (1, 2), (1, 3), (2, 1), (2, 2)).toDF("n", "i")
+    val sortOrder = SortOrder($"i".expr, Ascending)
+    val window1 = new WindowSpec(Seq(), Seq(sortOrder), UnspecifiedFrame)
+    val window2 = new WindowSpec(Seq(lit(1).expr), Seq(sortOrder), UnspecifiedFrame)
+    val df1 = ds.select(row_number().over(window1).alias("num"))
+    val df2 = ds.select(row_number().over(window2).alias("num"))

Review Comment:
   for end-to-end tests, let's use end-to-end APIs, e.g. `Window.partitionBy($"key").orderBy($"value")`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45352][SQL] Eliminate foldable window partitions [spark]

Posted by "zml1206 (via GitHub)" <gi...@apache.org>.
zml1206 commented on PR #43144:
URL: https://github.com/apache/spark/pull/43144#issuecomment-1873652051

   @cloud-fan Already fixed unit test, could you have time to take a look?Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45352][SQL] Eliminate foldable window partitions [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #43144:
URL: https://github.com/apache/spark/pull/43144#discussion_r1376121522


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -1241,6 +1242,24 @@ object OptimizeRepartition extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Remove window partition if partition expressions are foldable
+ */
+object EliminateWindowPartitions extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
+    _.containsPattern(WINDOW), ruleId) {
+    case w @ Window(windowExprs, partitionSpec, _, _) if partitionSpec.exists(_.foldable) =>
+      val newWindowExprs = windowExprs.map(_.transformWithPruning(
+        _.containsPattern(WINDOW_EXPRESSION)) {
+        case windowExpr @ WindowExpression(_, wsd @ WindowSpecDefinition(ps, _, _))
+          if ps.exists(_.foldable) =>
+          val newWsd = wsd.copy(partitionSpec = ps.filter(!_.foldable))
+          windowExpr.copy(windowSpec = newWsd)

Review Comment:
   Please use `withNewChildren` instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45352][SQL] Eliminate foldable window partitions [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on PR #43144:
URL: https://github.com/apache/spark/pull/43144#issuecomment-1785081862

   cc @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45352][SQL] Eliminate foldable window partitions [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #43144:
URL: https://github.com/apache/spark/pull/43144#discussion_r1377081807


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -1241,6 +1242,24 @@ object OptimizeRepartition extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Remove window partition if partition expressions are foldable
+ */
+object EliminateWindowPartitions extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
+    _.containsPattern(WINDOW), ruleId) {
+    case w @ Window(windowExprs, partitionSpec, _, _) if partitionSpec.exists(_.foldable) =>
+      val newWindowExprs = windowExprs.map(_.transformWithPruning(
+        _.containsPattern(WINDOW_EXPRESSION)) {
+        case windowExpr @ WindowExpression(_, wsd @ WindowSpecDefinition(ps, _, _))
+          if ps.exists(_.foldable) =>
+          val newWsd = wsd.copy(partitionSpec = ps.filter(!_.foldable))
+          windowExpr.copy(windowSpec = newWsd)

Review Comment:
   reduce object copy.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45352][SQL] Eliminate foldable window partitions [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43144:
URL: https://github.com/apache/spark/pull/43144#discussion_r1440108285


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateWindowPartitionsSuite.scala:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules._
+
+class EliminateWindowPartitionsSuite extends PlanTest {
+
+  private object Optimize extends RuleExecutor[LogicalPlan] {
+    val batches =
+      Batch("Eliminate window partitions", FixedPoint(20),
+        EliminateWindowPartitions) :: Nil
+  }
+
+  val testRelation = LocalRelation($"a".int, $"b".int)
+  private val a = testRelation.output(0)
+  private val b = testRelation.output(1)
+  private val windowFrame = SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow)
+
+  test("Remove foldable window partitions") {
+    val originalQuery =
+      testRelation
+        .select(a, b,
+          windowExpr(RowNumber(),
+            windowSpec(Literal(1) :: Nil, b.desc :: Nil, windowFrame)).as("rn"))
+
+    val correctAnswer =
+      testRelation
+        .select(a, b,
+          windowExpr(RowNumber(),
+            windowSpec(Nil, b.desc :: Nil, windowFrame)).as("rn"))

Review Comment:
   @beliefer can you help confirm that no partition column means a single partition in the window operator?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45352][SQL] Eliminate foldable window partitions [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan closed pull request #43144: [SPARK-45352][SQL] Eliminate foldable window partitions
URL: https://github.com/apache/spark/pull/43144


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org