You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Tao Luo (JIRA)" <ji...@apache.org> on 2019/02/08 18:58:00 UTC

[jira] [Commented] (SPARK-21492) Memory leak in SortMergeJoin

    [ https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16763820#comment-16763820 ] 

Tao Luo commented on SPARK-21492:
---------------------------------

If SortMergeJoinScanner doesn't consume the iterator from UnsafeExternalRowSorter entirely, the memory that UnsafeExternalSorter acquired from TaskMemoryManager will never be released. This leads to a memory leak, spills, and OOME. A page will be held per partition of the unused iterator.



{code:java}
from pyspark.sql.functions import rand, col

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn('value', rand())
r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn('value2', rand())
joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show(){code}
 
{{== Physical Plan == Coalesce 1 +- *(5) SortMergeJoin [timestamp1#52L|#52L], [timestamp2#59L|#59L], Inner :- *(2) Sort [timestamp1#52L ASC NULLS FIRST|#52L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(timestamp1#52L, 200) : +- *(1) Project [id#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54|#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54] : +- *(1) Range (1, 1001, step=1, splits=4) +- *(4) Sort [timestamp2#59L ASC NULLS FIRST|#59L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(timestamp2#59L, 200) +- *(3) Project [id#57L AS timestamp2#59L, rand(-3625198886289022666) AS value2#61|#57L AS timestamp2#59L, rand(-3625198886289022666) AS value2#61] +- *(3) Range (1000, 2001, step=1, splits=4)}}

{{org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 21, 10.100.100.10, executor 0): org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes of memory, got 0}}
 

Using broadcast succeeds:
{code:java}
#broadcast
joined = r1.join(broadcast(r2), r1.timestamp1 == r2.timestamp2, "inner"){code}
Running on Spark 2.4. 

 

Or if you prefer Scala:
{code:java}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, rand}

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

var r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn("value", rand())
var r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn("value2", rand())
var joined = r1.join(r2, col("timestamp1") === col("timestamp2"), "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show(){code}
 

 

Just reproduced it in standalone mode using [https://www.apache.org/dyn/closer.lua/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz,] Python 3.7. Same code as above.

 

Succeeds with 1 thread: ./bin/pyspark --master local[1]

Fails with >1 thread: ./bin/pyspark --master local[4]
{code:java}
SparkSession available as 'spark'.

>>> from pyspark.sql.functions import rand, col

>>>

>>> spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")

>>> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

>>>

>>> r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))

>>> r1 = r1.withColumn('value', rand())

>>> r2 = spark.range(1000, 1001).select(col("id").alias("timestamp2"))

>>> r2 = r2.withColumn('value2', rand())

>>> joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")

>>> joined = joined.coalesce(1)

>>> joined.show(){code}
 
{code:java}
[Stage 2:>                                                          (0 + 1) / 1]2019-02-06 17:12:27 WARN  TaskMemoryManager:304 - Failed to allocate a page (1900544 bytes), try again.

2019-02-06 17:12:27 ERROR Executor:91 - Exception in task 0.0 in stage 2.0 (TID 6)

org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 28 bytes of memory, got 0

at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157)

at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:119)

at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:383)

at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:407)

at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:135){code}
 

> Memory leak in SortMergeJoin
> ----------------------------
>
>                 Key: SPARK-21492
>                 URL: https://issues.apache.org/jira/browse/SPARK-21492
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0
>            Reporter: Zhan Zhang
>            Priority: Major
>
> In SortMergeJoin, if the iterator is not exhausted, there will be memory leak caused by the Sort. The memory is not released until the task end, and cannot be used by other operators causing performance drop or OOM.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org