You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/03/27 00:32:12 UTC

[spark] branch branch-3.3 updated: [SPARK-38308][SQL] Eagerly iterate over sequence of window expressions in `ExtractWindowExpressions`

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 7fcdd71  [SPARK-38308][SQL] Eagerly iterate over sequence of window expressions in `ExtractWindowExpressions`
7fcdd71 is described below

commit 7fcdd71fa976d910425143c9e733385c7cba37ca
Author: Bruce Robbins <be...@gmail.com>
AuthorDate: Sun Mar 27 09:29:49 2022 +0900

    [SPARK-38308][SQL] Eagerly iterate over sequence of window expressions in `ExtractWindowExpressions`
    
    ### What changes were proposed in this pull request?
    
    Pass an `IndexedSeq` (likely a `Vector`) to `ExtractWindowExpressions.extract` and `ExtractWindowExpressions.addWindow` rather than whatever sequence type was specified by the user (in the `Dataset.select` method).
    
    To accomplish this, we only need to pass an `IndexedSeq` to `ExtractWindowExpressions.extract`. `ExtractWindowExpressions.extract` will then return another `IndexedSeq` that we will pass on to `ExtractWindowExpressions.addWindow`
    
    ### Why are the changes needed?
    
    Consider this query:
    ```
    val df = spark.range(0, 20).map { x =>
      (x % 4, x + 1, x + 2)
    }.toDF("a", "b", "c")
    
    import org.apache.spark.sql.expressions._
    
    val w = Window.partitionBy("a").orderBy("b")
    val selectExprs = Stream(
      sum("c").over(w.rowsBetween(Window.unboundedPreceding, Window.currentRow)).as("sumc"),
      avg("c").over(w.rowsBetween(Window.unboundedPreceding, Window.currentRow)).as("avgc")
    )
    
    df.select(selectExprs: _*).show(false)
    ```
    It fails with
    ```
    org.apache.spark.sql.AnalysisException: Resolved attribute(s) avgc#23 missing from c#16L,a#14L,b#15L,sumc#21L in operator !Project [c#16L, a#14L, b#15L, sumc#21L, sumc#21L, avgc#23].;
    ```
    If you change `Stream` to a `Seq`, it succeeds.
    
    As with SPARK-38221 and SPARK-38528, this is due to the use of this code pattern:
    
    ```
      def someMethod (seq: Seq[xxx]) {
        ...
        val outerDataStructure = <create outer data structure>
        val newSeq = seq.map { x =>
          ...
          code that puts something in outerDataStructure
          ...
        }
        ...
        code that uses outerDataStructure (and expects it to be populated)
        ...
      }
    ```
    If `seq` is a `Stream`, `seq.map` might be evaluated lazily, in which case `outerDataStructure` will not be fully populated before it is used.
    
    Both `ExtractWindowExpressions.extract` and `ExtractWindowExpressions.addWindow` use this pattern, but the above example failure is due to the pattern's use in `ExtractWindowExpressions.addWindow` (`extractedWindowExprBuffer` does not get fully populated, so the Window operator does not produce the output expected by its parent projection).
    
    I chose `IndexedSeq` not for its efficient indexing, but because `map` will eagerly iterate over it.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New unit test.
    
    Closes #35635 from bersprockets/window_expression_stream_issue.
    
    Authored-by: Bruce Robbins <be...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
    (cherry picked from commit eb30a27e53158e64fffaa6d32ff9369ffbae0384)
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     |  2 +-
 .../spark/sql/DataFrameWindowFunctionsSuite.scala  | 24 ++++++++++++++++++++++
 2 files changed, 25 insertions(+), 1 deletion(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 5289983..6d95067 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -3141,7 +3141,7 @@ class Analyzer(override val catalogManager: CatalogManager)
       // have been resolved.
       case p @ Project(projectList, child)
         if hasWindowFunction(projectList) && !p.expressions.exists(!_.resolved) =>
-        val (windowExpressions, regularExpressions) = extract(projectList)
+        val (windowExpressions, regularExpressions) = extract(projectList.toIndexedSeq)
         // We add a project to get all needed expressions for window expressions from the child
         // of the original Project operator.
         val withProject = Project(regularExpressions, child)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
index 11b2309..4676f8b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
@@ -1122,4 +1122,28 @@ class DataFrameWindowFunctionsSuite extends QueryTest
       assert(shuffleByRequirement, "Can't find desired shuffle node from the query plan")
     }
   }
+
+  test("SPARK-38308: Properly handle Stream of window expressions") {
+    val df = Seq(
+      (1, 2, 3),
+      (1, 3, 4),
+      (2, 4, 5),
+      (2, 5, 6)
+    ).toDF("a", "b", "c")
+
+    val w = Window.partitionBy("a").orderBy("b")
+    val selectExprs = Stream(
+      sum("c").over(w.rowsBetween(Window.unboundedPreceding, Window.currentRow)).as("sumc"),
+      avg("c").over(w.rowsBetween(Window.unboundedPreceding, Window.currentRow)).as("avgc")
+    )
+    checkAnswer(
+      df.select(selectExprs: _*),
+      Seq(
+        Row(3, 3),
+        Row(7, 3.5),
+        Row(5, 5),
+        Row(11, 5.5)
+      )
+    )
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org