You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "t oo (JIRA)" <ji...@apache.org> on 2019/02/15 20:38:00 UTC

[jira] [Commented] (SPARK-25998) TorrentBroadcast holds strong reference to broadcast object

    [ https://issues.apache.org/jira/browse/SPARK-25998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16769708#comment-16769708 ] 

t oo commented on SPARK-25998:
------------------------------

any chance for a backport to 2.x?

> TorrentBroadcast holds strong reference to broadcast object
> -----------------------------------------------------------
>
>                 Key: SPARK-25998
>                 URL: https://issues.apache.org/jira/browse/SPARK-25998
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 2.4.0
>            Reporter: Brandon Krieger
>            Assignee: Brandon Krieger
>            Priority: Major
>             Fix For: 3.0.0
>
>
> If we do a large number of broadcast joins while holding onto the Dataset reference, it will hold onto a large amount of memory for the value of the broadcast object. The broadcast object is also held in the MemoryStore, but that will clean itself up to prevent its memory usage from going over a certain level. In my use case, I don't want to release the reference to the Dataset (which would allow the broadcast object to be GCed) because I want to be able to unpersist it at some point in the future (when it is no longer relevant).
> See the following repro in Spark shell:
> {code:java}
> import org.apache.spark.sql.functions._
> import org.apache.spark.SparkEnv
> val startDf = (1 to 1000000).toDF("num").withColumn("num", $"num".cast("string")).cache()
> val leftDf = startDf.withColumn("num", concat($"num", lit("0")))
> val rightDf = startDf.withColumn("num", concat($"num", lit("1")))
> val broadcastJoinedDf = leftDf.join(broadcast(rightDf), leftDf.col("num").eqNullSafe(rightDf.col("num")))
> broadcastJoinedDf.count
> // Take a heap dump, see UnsafeHashedRelation with hard references in MemoryStore and Dataset
> // Force the MemoryStore to clear itself
> SparkEnv.get.blockManager.stop
> // Trigger GC, then take another Heap Dump. The UnsafeHashedRelation is now referenced only by the Dataset.
> {code}
> If we make the TorrentBroadcast hold a weak reference to the broadcast object, the second heap dump will show nothing; the UnsafeHashedRelation has been GCed.
> Given that the broadcast object can be reloaded from the MemoryStore, it seems like it would be alright to use a WeakReference instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org