You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by bkrieger <gi...@git.apache.org> on 2018/11/09 22:35:26 UTC

[GitHub] spark pull request #22995: [SPARK-25998] [CORE] Change TorrentBroadcast to h...

GitHub user bkrieger opened a pull request:

    https://github.com/apache/spark/pull/22995

    [SPARK-25998] [CORE] Change TorrentBroadcast to hold weak reference of broadcast object

    ## What changes were proposed in this pull request?
    
    This PR changes the broadcast object in TorrentBroadcast from a strong reference to a weak reference. This allows it to be garbage collected even if the Dataset is held in memory. This is ok, because the broadcast object can always be re-read.
    
    ## How was this patch tested?
    
    Tested in Spark shell by taking a heap dump, full repro steps listed in https://issues.apache.org/jira/browse/SPARK-25998.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/bkrieger/spark bk/torrent-broadcast-weak

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/22995.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #22995
    
----
commit a2683b62985fc9c7d15fb92f3bb170a4b5225058
Author: Brandon Krieger <bk...@...>
Date:   2018-11-08T23:04:06Z

    use weak reference for torrent broadcast

commit 99fbeecf43a289648a56d178fa55e188ce75bdb7
Author: Brandon Krieger <bk...@...>
Date:   2018-11-09T21:04:51Z

    fix compile

commit 5e0a179c168a70b0166abe4bb51a1d26a2f1d666
Author: Brandon Krieger <bk...@...>
Date:   2018-11-09T21:33:22Z

    fix

commit 1908b5b8dfa6c0b55db3bd9a90e21ca713e5bf25
Author: Brandon Krieger <bk...@...>
Date:   2018-11-09T21:48:44Z

    no npe

commit 24183e5b8b63e0b4e117856ab4de7eb1b0ea6c9a
Author: Brandon Krieger <bk...@...>
Date:   2018-11-09T21:52:21Z

    no option

commit f212da322242386ce3b71e9961a964e60b587287
Author: Brandon Krieger <bk...@...>
Date:   2018-11-09T22:08:23Z

    typo

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22995: [SPARK-25998] [CORE] Change TorrentBroadcast to h...

Posted by bkrieger <gi...@git.apache.org>.
Github user bkrieger commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22995#discussion_r236724749
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -92,8 +95,15 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
       /** The checksum for all the blocks. */
       private var checksums: Array[Int] = _
     
    -  override protected def getValue() = {
    -    _value
    +  override protected def getValue() = synchronized {
    --- End diff --
    
    Do you mean switching `TorrentBroadcast.synchronized` to `broadcastCache.synchronized` inside `readBroadcastBlock`, or changing `this.synchronized` to `broadcastCache.synchronized` inside `getValue()` (and getting rid of the lock in `readBroadcastBlock`?



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22995: [SPARK-25998] [CORE] Change TorrentBroadcast to hold wea...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22995
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22995: [SPARK-25998] [CORE] Change TorrentBroadcast to h...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22995#discussion_r236394496
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -93,7 +96,14 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
       private var checksums: Array[Int] = _
     
       override protected def getValue() = {
    -    _value
    +    val memoized: T = if (_value == null) null.asInstanceOf[T] else _value.get
    --- End diff --
    
    Hm, weird, I thought it would work based on a little local example, but yeah leave the cast in of course.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22995: [SPARK-25998] [CORE] Change TorrentBroadcast to h...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22995#discussion_r236721318
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -92,8 +95,15 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
       /** The checksum for all the blocks. */
       private var checksums: Array[Int] = _
     
    -  override protected def getValue() = {
    -    _value
    +  override protected def getValue() = synchronized {
    --- End diff --
    
    Hm, that `TorrentBroadcast.synchronized` is from a very old version of the code in 2013, when more things used that lock. I'm pretty certain it's obsolete. However this code accesses broadcastCache and that needs synchronization. (It's kind of unfortunate where this object is). I think we could actually improve this by locking on broadcastCache for basically the whole block. I think that's a safe improvement as nothing else uses broadcastCache, nothing else is synchronized here, and should behave just the same.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22995: [SPARK-25998] [CORE] Change TorrentBroadcast to hold wea...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22995
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22995: [SPARK-25998] [CORE] Change TorrentBroadcast to h...

Posted by bkrieger <gi...@git.apache.org>.
Github user bkrieger commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22995#discussion_r236715555
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -92,8 +95,15 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
       /** The checksum for all the blocks. */
       private var checksums: Array[Int] = _
     
    -  override protected def getValue() = {
    -    _value
    +  override protected def getValue() = synchronized {
    --- End diff --
    
    I think we can remove the `TorrentBroadcast.synchronized` in `readBroadcastBlock`, since we're already synchronizing in its only caller? Though I'm not sure why it was necessary in the first place, as `readBroadcastBlock` should only have been called once before this PR.
    
    Regardless, I agree that the perf hit should be ok. Let me know if you want any of this changed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22995: [SPARK-25998] [CORE] Change TorrentBroadcast to hold wea...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22995
  
    **[Test build #4443 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4443/testReport)** for PR 22995 at commit [`09ae762`](https://github.com/apache/spark/commit/09ae762962098e58be7ba8f777f9dffde2f81d81).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22995: [SPARK-25998] [CORE] Change TorrentBroadcast to h...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22995#discussion_r236726776
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -92,8 +95,15 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
       /** The checksum for all the blocks. */
       private var checksums: Array[Int] = _
     
    -  override protected def getValue() = {
    -    _value
    +  override protected def getValue() = synchronized {
    --- End diff --
    
    We need two locks, one to protect the local reference to the block, and one to protect the shared cache object. I was thinking of the former.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22995: [SPARK-25998] [CORE] Change TorrentBroadcast to hold wea...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22995
  
    **[Test build #4443 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4443/testReport)** for PR 22995 at commit [`09ae762`](https://github.com/apache/spark/commit/09ae762962098e58be7ba8f777f9dffde2f81d81).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22995: [SPARK-25998] [CORE] Change TorrentBroadcast to h...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22995#discussion_r234396107
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -93,7 +96,14 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
       private var checksums: Array[Int] = _
     
       override protected def getValue() = {
    -    _value
    +    val memoized: T = if (_value == null) null.asInstanceOf[T] else _value.get
    --- End diff --
    
    I suppose there is a race condition here, in that several threads could end up simultaneously setting the reference. It won't be incorrect as the data ought to be the same. I am not sure of the access pattern for this object; maybe it's always single-threaded. But if lots are reading, you can imagine them all causing a call to `readBroadcastBlock()` simultaneously.
    
    Introducing another object to lock on is safe and not too much extra legwork. Might be worth it.
    
    Isn't WeakReference cleared on any GC? would SoftReference be better to hold out until memory is exhausted? to avoid re-reading. There's a tradeoff there.
    
    Good idea, just surprisingly full of possible gotchas.
    
    Nit: isn't `val memoized = if (_value == null) null else _value.get` sufficient?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22995: [SPARK-25998] [CORE] Change TorrentBroadcast to h...

Posted by bkrieger <gi...@git.apache.org>.
Github user bkrieger commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22995#discussion_r236775302
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -92,8 +95,15 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
       /** The checksum for all the blocks. */
       private var checksums: Array[Int] = _
     
    -  override protected def getValue() = {
    -    _value
    +  override protected def getValue() = synchronized {
    --- End diff --
    
    done


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22995: [SPARK-25998] [CORE] Change TorrentBroadcast to hold wea...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22995
  
    **[Test build #4444 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4444/testReport)** for PR 22995 at commit [`09ae762`](https://github.com/apache/spark/commit/09ae762962098e58be7ba8f777f9dffde2f81d81).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22995: [SPARK-25998] [CORE] Change TorrentBroadcast to hold wea...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the issue:

    https://github.com/apache/spark/pull/22995
  
    ok to test


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22995: [SPARK-25998] [CORE] Change TorrentBroadcast to hold wea...

Posted by bkrieger <gi...@git.apache.org>.
Github user bkrieger commented on the issue:

    https://github.com/apache/spark/pull/22995
  
    @srowen @mridulm for some reason it looks like tests aren't being triggered, can one of you trigger?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22995: [SPARK-25998] [CORE] Change TorrentBroadcast to h...

Posted by bkrieger <gi...@git.apache.org>.
Github user bkrieger commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22995#discussion_r236391043
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -93,7 +96,14 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
       private var checksums: Array[Int] = _
     
       override protected def getValue() = {
    -    _value
    +    val memoized: T = if (_value == null) null.asInstanceOf[T] else _value.get
    --- End diff --
    
    Good catch. I'll make it synchronized, so it only loads one at a time.
    
    Re: WeakReference, sure, I can change it to SoftReference. That'll be closer to the original behavior, and should still give the improvement we want.
    
    When I try with `.asInstanceOf[T]` it fails to compile with:
    ```
    [error] /Users/bkrieger/Documents/git/spark/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala:98: type mismatch;
    [error]  found   : Null(null)
    [error]  required: T
    [error]     val memoized: T = if (_value == null) null else _value.get
    [error]                                           ^
    [info] Null(null) <: T?
    [info] false
    [error] one error found
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22995: [SPARK-25998] [CORE] Change TorrentBroadcast to hold wea...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the issue:

    https://github.com/apache/spark/pull/22995
  
    Merged to master


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22995: [SPARK-25998] [CORE] Change TorrentBroadcast to hold wea...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22995
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22995: [SPARK-25998] [CORE] Change TorrentBroadcast to hold wea...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22995
  
    **[Test build #4444 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4444/testReport)** for PR 22995 at commit [`09ae762`](https://github.com/apache/spark/commit/09ae762962098e58be7ba8f777f9dffde2f81d81).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22995: [SPARK-25998] [CORE] Change TorrentBroadcast to h...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22995#discussion_r236471154
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -92,8 +95,15 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
       /** The checksum for all the blocks. */
       private var checksums: Array[Int] = _
     
    -  override protected def getValue() = {
    -    _value
    +  override protected def getValue() = synchronized {
    --- End diff --
    
    I noticed that below, the readBroadcastBlock() method is synchronized on the companion object, which makes me nervous. However a lazy val is also implemented with `this.synchronized`, so I suspect this is fine. We pay the cost of synchronization on every call now, but I think that is OK here, as a simple flag won't help us figure out whether the SoftReference has been cleared.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22995: [SPARK-25998] [CORE] Change TorrentBroadcast to h...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/22995


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org