You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Tao Luo (JIRA)" <ji...@apache.org> on 2019/02/05 00:02:00 UTC

[jira] [Commented] (SPARK-24657) SortMergeJoin may cause SparkOutOfMemory in execution memory because of not cleanup resource when finished the merge join

    [ https://issues.apache.org/jira/browse/SPARK-24657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16760328#comment-16760328 ] 

Tao Luo commented on SPARK-24657:
---------------------------------

Nice find. This looks like https://issues.apache.org/jira/browse/SPARK-24657 to me.

> SortMergeJoin may cause SparkOutOfMemory  in execution memory  because of not cleanup resource when finished the merge join
> ---------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-24657
>                 URL: https://issues.apache.org/jira/browse/SPARK-24657
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.2.0, 2.3.0, 2.3.1
>            Reporter: Joshuawangzj
>            Priority: Major
>
> In my sql, It join three tables, and all these tables are small table (about 2mb). And to solve the small files issue, I use coalesce(1). But it throw the oom exception: 
> {code:java}
> org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes of memory, got 0
> 	at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159)
> 	at org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99)
> 	at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.<init>(UnsafeInMemorySorter.java:128)
> 	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.<init>(UnsafeExternalSorter.java:162)
> 	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129)
> 	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.<init>(UnsafeExternalRowSorter.java:111)
> 	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96)
> 	at org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:89)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.init(generated.java:22)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:611)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:608)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847)
> {code}
> {code:java}
> 12:10:51.175 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 3.0 (TID 34, localhost, executor driver): org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes of memory, got 0
> 	at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159)
> 	at org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99)
> 	at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.<init>(UnsafeInMemorySorter.java:128)
> 	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.<init>(UnsafeExternalSorter.java:162)
> 	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129)
> 	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.<init>(UnsafeExternalRowSorter.java:111)
> 	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96)
> 	at org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:89)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.init(generated.java:22)
> {code}
> Finally I found out the problem go through studying the source code. The reason of  the exception is that task can't allocate page(in my case, the size per page is 32M) from MemoryManager because coalesce will run 20 parent paritition in one task(spark.sql.shuffle.partitions=20), and after sorted merge join for each parent partition, the UnsafeExternalRowSorter can not cleanup some pages allocated. After run 14th parent partition(in my case), there is no enough space in execution memory for acquiring page in sort. 
> Why UnsafeExternalRowSorter can not cleanup some pages resource after finished join for parent partition?
> After my constant attempts, the problem is in SortMergeJoinScanner. UnsafeExternalRowSorter cleanup resource only when it's iterator be advance to end. But in SortMergeJoinScanner, when streamedIterator is end ,the bufferedIterator may not end, so bufferedIterator cannot cleanup the resource and vice versa.
> The solution may be :
> 1、advance to last for the iterator when another iterator has traversed to last. This solution may decrease performace because of the unnecessary traversing.
> 2、When one iterator has traversed to last, we invoke the sorter cleanup method directly. This solution will cause large change for source code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org