You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yu...@apache.org on 2023/06/04 07:08:39 UTC

[spark] branch branch-3.4 updated: [SPARK-43911][SQL] Use toSet to deduplicate the iterator data to prevent the creation of large Array

This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 93709918aff [SPARK-43911][SQL] Use toSet to deduplicate the iterator data to prevent the creation of large Array
93709918aff is described below

commit 93709918affba4846a30cbae8692a6a328b5a448
Author: mcdull-zhang <wo...@163.com>
AuthorDate: Sun Jun 4 15:04:33 2023 +0800

    [SPARK-43911][SQL] Use toSet to deduplicate the iterator data to prevent the creation of large Array
    
    ### What changes were proposed in this pull request?
    When SubqueryBroadcastExec reuses the keys of Broadcast HashedRelation for dynamic partition pruning, it will put all the keys in an Array, and then call the distinct of the Array to remove the duplicates.
    
    In general, Broadcast HashedRelation may have many rows, and the repetition rate of this key is high. Doing so will cause this Array to occupy a large amount of memory (and this memory is not managed by MemoryManager), which may trigger OOM.
    
    The approach here is to directly call the toSet of the iterator to deduplicate, which can prevent the creation of a large array.
    
    ### Why are the changes needed?
    Avoid the occurrence of the following OOM exceptions:
    ```text
    Exception in thread "dynamicpruning-0" java.lang.OutOfMemoryError: Java heap space
            at scala.collection.mutable.ResizableArray.ensureSize(ResizableArray.scala:106)
            at scala.collection.mutable.ResizableArray.ensureSize$(ResizableArray.scala:96)
            at scala.collection.mutable.ArrayBuffer.ensureSize(ArrayBuffer.scala:49)
            at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:85)
            at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:49)
            at scala.collection.generic.Growable.$anonfun$$plus$plus$eq$1(Growable.scala:62)
            at scala.collection.generic.Growable$$Lambda$7/1514840818.apply(Unknown Source)
            at scala.collection.Iterator.foreach(Iterator.scala:943)
            at scala.collection.Iterator.foreach$(Iterator.scala:943)
            at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
            at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
            at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
            at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
            at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
            at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
            at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
            at scala.collection.AbstractIterator.to(Iterator.scala:1431)
            at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
            at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
            at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
            at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
            at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
            at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
            at org.apache.spark.sql.execution.SubqueryBroadcastExec.$anonfun$relationFuture$2(SubqueryBroadcastExec.scala:92)
            at org.apache.spark.sql.execution.SubqueryBroadcastExec$$Lambda$4212/5099232.apply(Unknown Source)
            at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withExecutionId$1(SQLExecution.scala:140)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Production environment manual verification && Pass existing unit tests
    
    Closes #41419 from mcdull-zhang/reduce_memory_usage.
    
    Authored-by: mcdull-zhang <wo...@163.com>
    Signed-off-by: Yuming Wang <yu...@ebay.com>
    (cherry picked from commit 595ad30e6259f7e4e4252dfee7704b73fd4760f7)
    Signed-off-by: Yuming Wang <yu...@ebay.com>
---
 .../scala/org/apache/spark/sql/execution/SubqueryBroadcastExec.scala    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryBroadcastExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryBroadcastExec.scala
index 22d042ccefb..80f863515d4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryBroadcastExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryBroadcastExec.scala
@@ -93,7 +93,7 @@ case class SubqueryBroadcastExec(
         val rows = if (broadcastRelation.keyIsUnique) {
           keyIter.toArray[InternalRow]
         } else {
-          keyIter.toArray[InternalRow].distinct
+          keyIter.toSet[InternalRow].toArray
         }
         val beforeBuild = System.nanoTime()
         longMetric("collectTime") += (beforeBuild - beforeCollect) / 1000000


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org