You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/08/02 09:06:51 UTC

[spark] branch branch-3.1 updated: [SPARK-39932][SQL] WindowExec should clear the final partition buffer

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new ed842ba8e82 [SPARK-39932][SQL] WindowExec should clear the final partition buffer
ed842ba8e82 is described below

commit ed842ba8e82e845efc38bd115909ce54faef318a
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Tue Aug 2 18:05:48 2022 +0900

    [SPARK-39932][SQL] WindowExec should clear the final partition buffer
    
    ### What changes were proposed in this pull request?
    
    Explicitly clear final partition buffer if can not find next in `WindowExec`. The same fix in `WindowInPandasExec`
    
    ### Why are the changes needed?
    
    We do a repartition after a window, then we need do a local sort after window due to RoundRobinPartitioning shuffle.
    
    The error stack:
    ```java
    ExternalAppendOnlyUnsafeRowArray INFO - Reached spill threshold of 4096 rows, switching to org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
    
    org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes of memory, got 0
            at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157)
            at org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:97)
            at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:352)
            at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.allocateMemoryForRecordIfNecessary(UnsafeExternalSorter.java:435)
            at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:455)
            at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:138)
            at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:226)
            at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$10(ShuffleExchangeExec.scala:355)
    ```
    
    `WindowExec` only clear buffer in `fetchNextPartition` so the final partition buffer miss to clear.
    
    It is not a big problem since we have task completion listener.
    ```scala
    taskContext.addTaskCompletionListener(context -> {
      cleanupResources();
    });
    ```
    
    This bug only affects if the window is not the last operator for this task and the follow operator like sort.
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes, bug fix
    
    ### How was this patch tested?
    
    N/A
    
    Closes #37358 from ulysses-you/window.
    
    Authored-by: ulysses-you <ul...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
    (cherry picked from commit 1fac870126c289a7ec75f45b6b61c93b9a4965d4)
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .../apache/spark/sql/execution/python/WindowInPandasExec.scala | 10 ++++++++--
 .../org/apache/spark/sql/execution/window/WindowExec.scala     | 10 ++++++++--
 2 files changed, 16 insertions(+), 4 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
index 983fe9db738..a87024cfcd7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
@@ -353,8 +353,14 @@ case class WindowInPandasExec(
         // Iteration
         var rowIndex = 0
 
-        override final def hasNext: Boolean =
-          (bufferIterator != null && bufferIterator.hasNext) || nextRowAvailable
+        override final def hasNext: Boolean = {
+          val found = (bufferIterator != null && bufferIterator.hasNext) || nextRowAvailable
+          if (!found) {
+            // clear final partition
+            buffer.clear()
+          }
+          found
+        }
 
         override final def next(): Iterator[UnsafeRow] = {
           // Load the next partition if we need to.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
index 6e0e36cbe59..963decb4cf4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
@@ -178,8 +178,14 @@ case class WindowExec(
         // Iteration
         var rowIndex = 0
 
-        override final def hasNext: Boolean =
-          (bufferIterator != null && bufferIterator.hasNext) || nextRowAvailable
+        override final def hasNext: Boolean = {
+          val found = (bufferIterator != null && bufferIterator.hasNext) || nextRowAvailable
+          if (!found) {
+            // clear final partition
+            buffer.clear()
+          }
+          found
+        }
 
         val join = new JoinedRow
         override final def next(): InternalRow = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org