You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by JoshRosen <gi...@git.apache.org> on 2014/10/19 09:07:43 UTC

[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

GitHub user JoshRosen opened a pull request:

    https://github.com/apache/spark/pull/2844

    [SPARK-3958] TorrentBroadcast cleanup / debugging improvements.

    This PR makes several changes to TorrentBroadcast in order to make
    it easier to reason about, which should help when debugging SPARK-3958.
    The key changes:
    
    - Remove all state from the global TorrentBroadcast object.  This state
      consisted mainly of configuration options, like the block size and
      compression codec, and was read by the blockify / unblockify methods.
      Unfortunately, the use of `lazy val` for `BLOCK_SIZE` meant that the block
      size was always determined by the first SparkConf that TorrentBroadast was
      initialized with; as a result, unit tests could not properly test
      TorrentBroadcast with different block sizes.
    
      Instead, blockifyObject and unBlockifyObject now accept compression codecs
      and blockSizes as arguments.  These arguments are supplied at the call sites
      inside of TorrentBroadcast instances.  Each TorrentBroadcast instance
      determines these values from SparkEnv's SparkConf.  I was careful to ensure
      that we do not accidentally serialize CompressionCodec or SparkConf objects
      as part of the TorrentBroadcast object.
    
    - Remove special-case handling of local-mode in TorrentBroadcast.  I don't
      think that broadcast implementations should know about whether we're running
      in local mode.  If we want to optimize the performance of broadcast in local
      mode, then we should detect this at a higher level and use a dummy
      LocalBroadcastFactory implementation instead.
    
      Removing this code fixes a subtle error condition: in the old local mode
      code, a failure to find the broadcast in the local BlockManager would lead
      to an attempt to deblockify zero blocks, which could lead to confusing
      deserialization or decompression errors when we attempted to decompress
      an empty byte array.  This should never have happened, though: a failure to
      find the block in local mode is evidence of some other error.  The changes
      here will make it easier to debug those errors if they ever happen.
    
    - Add a check that throws an exception when attempting to deblockify an
      empty array.
    
    - Use ScalaCheck to add a test to check that TorrentBroadcast's
      blockifyObject and unBlockifyObject methods are inverses.
    
    - Misc. cleanup and logging improvements.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/JoshRosen/spark torrentbroadcast-bugfix

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/2844.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2844
    
----
commit 48c98c1996c87cebbd0669924f57527b8e81c35e
Author: Josh Rosen <jo...@databricks.com>
Date:   2014-10-19T06:36:49Z

    [SPARK-3958] TorrentBroadcast cleanup / debugging improvements.
    
    This PR makes several changes to TorrentBroadcast in order to make
    it easier to reason about, which should help when debugging SPARK-3958.
    The key changes:
    
    - Remove all state from the global TorrentBroadcast object.  This state
      consisted mainly of configuration options, like the block size and
      compression codec, and was read by the blockify / unblockify methods.
      Unfortunately, the use of `lazy val` for `BLOCK_SIZE` meant that the block
      size was always determined by the first SparkConf that TorrentBroadast was
      initialized with; as a result, unit tests could not properly test
      TorrentBroadcast with different block sizes.
    
      Instead, blockifyObject and unBlockifyObject now accept compression codecs
      and blockSizes as arguments.  These arguments are supplied at the call sites
      inside of TorrentBroadcast instances.  Each TorrentBroadcast instance
      determines these values from SparkEnv's SparkConf.  I was careful to ensure
      that we do not accidentally serialize CompressionCodec or SparkConf objects
      as part of the TorrentBroadcast object.
    
    - Remove special-case handling of local-mode in TorrentBroadcast.  I don't
      think that broadcast implementations should know about whether we're running
      in local mode.  If we want to optimize the performance of broadcast in local
      mode, then we should detect this at a higher level and use a dummy
      LocalBroadcastFactory implementation instead.
    
      Removing this code fixes a subtle error condition: in the old local mode
      code, a failure to find the broadcast in the local BlockManager would lead
      to an attempt to deblockify zero blocks, which could lead to confusing
      deserialization or decompression errors when we attempted to decompress
      an empty byte array.  This should never have happened, though: a failure to
      find the block in local mode is evidence of some other error.  The changes
      here will make it easier to debug those errors if they ever happen.
    
    - Add a check that throws an exception when attempting to deblockify an
      empty array.
    
    - Use ScalaCheck to add a test to check that TorrentBroadcast's
      blockifyObject and unBlockifyObject methods are inverses.
    
    - Misc. cleanup and logging improvements.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/2844#issuecomment-59641733
  
    Also, /cc @davies, who helped me to spot the "local mode might deblockify an empty array" bug and who's been working on TorrentBroadcast optimizations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2844#discussion_r19063253
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -62,6 +59,20 @@ private[spark] class TorrentBroadcast[T: ClassTag](
        * blocks from the driver and/or other executors.
        */
       @transient private var _value: T = obj
    +  /** The compression codec to use, or None if compression is disabled */
    +  @transient private var compressionCodec: Option[CompressionCodec] = _
    +  /** Size of each block. Default value is 4MB.  This value is only read by the broadcaster. */
    +  @transient private var blockSize: Int = _
    --- End diff --
    
    How about move these two as part of Constructor? Reading the Conf in TorrentBroadcastFactor


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2844#discussion_r19130571
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -227,6 +217,7 @@ private object TorrentBroadcast extends Logging {
        * If removeFromDriver is true, also remove these persisted blocks on the driver.
        */
       def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = {
    +    logInfo(s"Unpersisting TorrentBroadcast $id")
    --- End diff --
    
    I don't feel super strongly over this one, but I feel given this is for "debugging" of exceptional cases, it should be in debug. If your worry is that the broadcast cleaner might clean up stuff prematurely, then I think we should log in the cleaner instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2844#issuecomment-59890238
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21974/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2844#discussion_r19118878
  
    --- Diff: core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala ---
    @@ -17,13 +17,18 @@
     
     package org.apache.spark.broadcast
     
    +import scala.util.Random
    +
    +import org.scalacheck.Gen
    --- End diff --
    
    This is from ScalaCheck; see http://www.scalatest.org/user_guide/generator_driven_property_checks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2844#issuecomment-59655954
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21893/consoleFull) for   PR 2844 at commit [`c3b08f9`](https://github.com/apache/spark/commit/c3b08f93b61f0748b7c42fc32314bd92150e5b88).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/2844#issuecomment-59641699
  
    /cc @rxin for review.  I'd like to apply this to `branch-1.1` as well, since I believe that it's also affected by current TorrentBroadcast bugs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2844#issuecomment-59643974
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21885/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2844#issuecomment-59868399
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21953/consoleFull) for   PR 2844 at commit [`2a9fdfd`](https://github.com/apache/spark/commit/2a9fdfd6bfb6ae1f952d52162d9687e058159282).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2844#issuecomment-59653640
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21893/consoleFull) for   PR 2844 at commit [`c3b08f9`](https://github.com/apache/spark/commit/c3b08f93b61f0748b7c42fc32314bd92150e5b88).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2844#issuecomment-59645137
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21888/consoleFull) for   PR 2844 at commit [`5c22782`](https://github.com/apache/spark/commit/5c227825b3cf0bbe3826e20fe66370229bfc43a2).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2844#discussion_r19063336
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -76,23 +87,20 @@ private[spark] class TorrentBroadcast[T: ClassTag](
        * @return number of blocks this broadcast variable is divided into
        */
       private def writeBlocks(): Int = {
    -    // For local mode, just put the object in the BlockManager so we can find it later.
    -    SparkEnv.get.blockManager.putSingle(
    -      broadcastId, _value, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
    -
    -    if (!isLocal) {
    -      val blocks = TorrentBroadcast.blockifyObject(_value)
    -      blocks.zipWithIndex.foreach { case (block, i) =>
    -        SparkEnv.get.blockManager.putBytes(
    -          BroadcastBlockId(id, "piece" + i),
    -          block,
    -          StorageLevel.MEMORY_AND_DISK_SER,
    -          tellMaster = true)
    -      }
    -      blocks.length
    -    } else {
    -      0
    +    // Store a copy of the broadcast variable in the driver so that tasks run on the driver
    +    // do not create a duplicate copy of the broadcast variable's value.
    +    SparkEnv.get.blockManager.putSingle(broadcastId, _value, StorageLevel.MEMORY_AND_DISK,
    +      tellMaster = false)
    --- End diff --
    
    The reason for this store is to avoid creating two copies of `_value` in the driver.  If we serialize and deserialize a broadcast variable on the driver and then attempt to access its value, then without this code we will end up going through the regular de-chunking code path, which will cause us to deserialize the serialized copy of `_value` and waste memory. 
    
    I believe that this serialization and deserialization can take place when tasks are run in local mode, since we still serialize tasks in order to help users be aware of serialization issues that would impact them if they moved to a cluster.  This complexity is another reason why I'm in favor of just scrapping all local-mode special-casing and configuring Spark to use a dummy LocalBroadcastFactory for local mode instead of whichever setting the user specified.  That would be a larger, more-invasive change, which is why I opted for the simpler fix here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2844#discussion_r19063287
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -156,6 +158,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](
       private def readObject(in: ObjectInputStream) {
         in.defaultReadObject()
         TorrentBroadcast.synchronized {
    +      setConf(SparkEnv.get.conf)
    --- End diff --
    
    The conf is application-scoped.  The same conf should be present on this application's executors, where this task will be deserialized.  This assumption is used elsewhere, too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by shivaram <gi...@git.apache.org>.
Github user shivaram commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2844#discussion_r19119774
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -227,6 +217,7 @@ private object TorrentBroadcast extends Logging {
        * If removeFromDriver is true, also remove these persisted blocks on the driver.
        */
       def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = {
    +    logInfo(s"Unpersisting TorrentBroadcast $id")
    --- End diff --
    
    Actually this is useful for debugging. I'd suggest keeping this at info


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2844#discussion_r19066407
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -179,43 +183,29 @@ private[spark] class TorrentBroadcast[T: ClassTag](
     
     
     private object TorrentBroadcast extends Logging {
    -  /** Size of each block. Default value is 4MB. */
    -  private lazy val BLOCK_SIZE = conf.getInt("spark.broadcast.blockSize", 4096) * 1024
    -  private var initialized = false
    -  private var conf: SparkConf = null
    -  private var compress: Boolean = false
    -  private var compressionCodec: CompressionCodec = null
    -
    -  def initialize(_isDriver: Boolean, conf: SparkConf) {
    -    TorrentBroadcast.conf = conf // TODO: we might have to fix it in tests
    -    synchronized {
    -      if (!initialized) {
    -        compress = conf.getBoolean("spark.broadcast.compress", true)
    -        compressionCodec = CompressionCodec.createCodec(conf)
    -        initialized = true
    -      }
    -    }
    -  }
     
    -  def stop() {
    -    initialized = false
    -  }
    -
    -  def blockifyObject[T: ClassTag](obj: T): Array[ByteBuffer] = {
    -    val bos = new ByteArrayChunkOutputStream(BLOCK_SIZE)
    -    val out: OutputStream = if (compress) compressionCodec.compressedOutputStream(bos) else bos
    -    val ser = SparkEnv.get.serializer.newInstance()
    +  def blockifyObject[T: ClassTag](
    --- End diff --
    
    These two methods, `blockifyObject` and `unBlockifyObject`, now accept all of their dependencies directly, which makes it easier to unit-test them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by shivaram <gi...@git.apache.org>.
Github user shivaram commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2844#discussion_r19130957
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -227,6 +217,7 @@ private object TorrentBroadcast extends Logging {
        * If removeFromDriver is true, also remove these persisted blocks on the driver.
        */
       def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = {
    +    logInfo(s"Unpersisting TorrentBroadcast $id")
    --- End diff --
    
    Its mostly for debugging what broadcasts have been removed and what has not. It can be probably be made debug once we have a UI for this (#2851), but right now this is the only way to figure out if a broadcast variable has been removed by looking at the driver logs.
    Also its just one line per broadcast variable (we have 2-3 lines per variable when it is created)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2844#discussion_r19119178
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -104,29 +112,23 @@ private[spark] class TorrentBroadcast[T: ClassTag](
     
         for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
           val pieceId = BroadcastBlockId(id, "piece" + pid)
    -
    -      // First try getLocalBytes because  there is a chance that previous attempts to fetch the
    +      logDebug(s"Reading piece $pieceId of $broadcastId")
    +      // First try getLocalBytes because there is a chance that previous attempts to fetch the
           // broadcast blocks have already fetched some of the blocks. In that case, some blocks
           // would be available locally (on this executor).
    -      var blockOpt = bm.getLocalBytes(pieceId)
    -      if (!blockOpt.isDefined) {
    -        blockOpt = bm.getRemoteBytes(pieceId)
    -        blockOpt match {
    -          case Some(block) =>
    -            // If we found the block from remote executors/driver's BlockManager, put the block
    -            // in this executor's BlockManager.
    -            SparkEnv.get.blockManager.putBytes(
    -              pieceId,
    -              block,
    -              StorageLevel.MEMORY_AND_DISK_SER,
    -              tellMaster = true)
    -
    -          case None =>
    -            throw new SparkException("Failed to get " + pieceId + " of " + broadcastId)
    -        }
    +      val block: ByteBuffer = bm.getLocalBytes(pieceId).getOrElse {
    +        bm.getRemoteBytes(pieceId).map { block =>
    --- End diff --
    
    to be more explicit, i'm suggesting the old style is easier to understand


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2844#issuecomment-59643009
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21885/consoleFull) for   PR 2844 at commit [`33fc754`](https://github.com/apache/spark/commit/33fc75447c676a5fca1f6f7e7095562f3a1583d5).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/2844#issuecomment-59891101
  
    I've merged this into master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/2844


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2844#discussion_r19063222
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -76,23 +87,20 @@ private[spark] class TorrentBroadcast[T: ClassTag](
        * @return number of blocks this broadcast variable is divided into
        */
       private def writeBlocks(): Int = {
    -    // For local mode, just put the object in the BlockManager so we can find it later.
    -    SparkEnv.get.blockManager.putSingle(
    -      broadcastId, _value, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
    -
    -    if (!isLocal) {
    -      val blocks = TorrentBroadcast.blockifyObject(_value)
    -      blocks.zipWithIndex.foreach { case (block, i) =>
    -        SparkEnv.get.blockManager.putBytes(
    -          BroadcastBlockId(id, "piece" + i),
    -          block,
    -          StorageLevel.MEMORY_AND_DISK_SER,
    -          tellMaster = true)
    -      }
    -      blocks.length
    -    } else {
    -      0
    +    // Store a copy of the broadcast variable in the driver so that tasks run on the driver
    +    // do not create a duplicate copy of the broadcast variable's value.
    +    SparkEnv.get.blockManager.putSingle(broadcastId, _value, StorageLevel.MEMORY_AND_DISK,
    +      tellMaster = false)
    --- End diff --
    
    I wonder that store a serialized copy in local mode will not help anything. If it failed to fetch the original copy of value from blockManager, it will also can not fetch the serialized copy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/2844#issuecomment-59653126
  
    This most recent test-failure is another side-effect of removing TorrentBroadcast's optimizations for local mode:
    
    ```
    [info] - Unpersisting TorrentBroadcast on executors only in local mode *** FAILED ***
    [info]   1 did not equal 0 (BroadcastSuite.scala:219)
    [info] - Unpersisting TorrentBroadcast on executors and driver in local mode *** FAILED ***
    [info]   1 did not equal 0 (BroadcastSuite.scala:219)
    ```
    
    This time, the error is because there's a check that asserts that broadcast pieces are not stored into the driver's block manager when running in local mode.  I don't think that this optimization necessarily makes sense, since we'll have to store those blocks anyways when running in distributed mode.  Therefore, I'm going to change these tests to remove this local-mode special-casing.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2844#discussion_r19119195
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -62,6 +59,20 @@ private[spark] class TorrentBroadcast[T: ClassTag](
        * blocks from the driver and/or other executors.
        */
       @transient private var _value: T = obj
    +  /** The compression codec to use, or None if compression is disabled */
    +  @transient private var compressionCodec: Option[CompressionCodec] = _
    +  /** Size of each block. Default value is 4MB.  This value is only read by the broadcaster. */
    +  @transient private var blockSize: Int = _
    +
    +  private def setConf(conf: SparkConf) {
    +    compressionCodec = if (conf.getBoolean("spark.broadcast.compress", true)) {
    +      Some(CompressionCodec.createCodec(conf))
    +    } else {
    +      None
    +    }
    +    blockSize = conf.getInt("spark.broadcast.blockSize", 4096) * 1024
    +  }
    +  setConf(SparkEnv.get.conf)
    --- End diff --
    
    update the javadoc for this class to make it very obvious that at init time, this class reads configuration from SparkEnv.get.conf


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2844#issuecomment-59643971
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21885/consoleFull) for   PR 2844 at commit [`33fc754`](https://github.com/apache/spark/commit/33fc75447c676a5fca1f6f7e7095562f3a1583d5).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2844#discussion_r19130611
  
    --- Diff: core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala ---
    @@ -84,6 +89,24 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
         assert(results.collect().toSet === (1 to numSlaves).map(x => (x, 10)).toSet)
       }
     
    +  test("TorrentBroadcast's blockifyObject and unblockifyObject are inverses") {
    +    import org.apache.spark.broadcast.TorrentBroadcast._
    +    val blockSize = 1024
    +    val conf = new SparkConf()
    +    val compressionCodec = Some(new SnappyCompressionCodec(conf))
    +    val serializer = new JavaSerializer(conf)
    +    val objects = for (size <- Gen.choose(1, 1024 * 10)) yield {
    --- End diff --
    
    as discussed offline, maybe just use a random number generator here since Gen brings extra complexity but not much benefit in this specific case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2844#issuecomment-59884782
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21974/consoleFull) for   PR 2844 at commit [`1e8268d`](https://github.com/apache/spark/commit/1e8268d6111e4ad45e2acfe47d837718f2170461).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2844#discussion_r19126189
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -104,29 +112,23 @@ private[spark] class TorrentBroadcast[T: ClassTag](
     
         for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
           val pieceId = BroadcastBlockId(id, "piece" + pid)
    -
    -      // First try getLocalBytes because  there is a chance that previous attempts to fetch the
    +      logDebug(s"Reading piece $pieceId of $broadcastId")
    +      // First try getLocalBytes because there is a chance that previous attempts to fetch the
           // broadcast blocks have already fetched some of the blocks. In that case, some blocks
           // would be available locally (on this executor).
    -      var blockOpt = bm.getLocalBytes(pieceId)
    -      if (!blockOpt.isDefined) {
    -        blockOpt = bm.getRemoteBytes(pieceId)
    -        blockOpt match {
    -          case Some(block) =>
    -            // If we found the block from remote executors/driver's BlockManager, put the block
    -            // in this executor's BlockManager.
    -            SparkEnv.get.blockManager.putBytes(
    -              pieceId,
    -              block,
    -              StorageLevel.MEMORY_AND_DISK_SER,
    -              tellMaster = true)
    -
    -          case None =>
    -            throw new SparkException("Failed to get " + pieceId + " of " + broadcastId)
    -        }
    +      val block: ByteBuffer = bm.getLocalBytes(pieceId).getOrElse {
    +        bm.getRemoteBytes(pieceId).map { block =>
    --- End diff --
    
    I pushed a new commit that simplifies this code.  I think that the problem was the use of nested getOrElse calls.  I replaced this with a series of `defs` that show how to get the bytes locally and remotely, followed by a non-nested `orElse` chain.  I think this is a lot cleaner now, since the core logic is a one-liner:
    
    ```scala
    getLocal.orElse(getRemote).getOrElse(
            throw new SparkException(s"Failed to get $pieceId of $broadcastId"))
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2844#discussion_r19119141
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -104,29 +112,23 @@ private[spark] class TorrentBroadcast[T: ClassTag](
     
         for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
           val pieceId = BroadcastBlockId(id, "piece" + pid)
    -
    -      // First try getLocalBytes because  there is a chance that previous attempts to fetch the
    +      logDebug(s"Reading piece $pieceId of $broadcastId")
    +      // First try getLocalBytes because there is a chance that previous attempts to fetch the
           // broadcast blocks have already fetched some of the blocks. In that case, some blocks
           // would be available locally (on this executor).
    -      var blockOpt = bm.getLocalBytes(pieceId)
    -      if (!blockOpt.isDefined) {
    -        blockOpt = bm.getRemoteBytes(pieceId)
    -        blockOpt match {
    -          case Some(block) =>
    -            // If we found the block from remote executors/driver's BlockManager, put the block
    -            // in this executor's BlockManager.
    -            SparkEnv.get.blockManager.putBytes(
    -              pieceId,
    -              block,
    -              StorageLevel.MEMORY_AND_DISK_SER,
    -              tellMaster = true)
    -
    -          case None =>
    -            throw new SparkException("Failed to get " + pieceId + " of " + broadcastId)
    -        }
    +      val block: ByteBuffer = bm.getLocalBytes(pieceId).getOrElse {
    +        bm.getRemoteBytes(pieceId).map { block =>
    --- End diff --
    
    given this block is long, can we avoid using map.getOrElse? Just make it more explicit


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2844#discussion_r19126192
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -62,6 +59,20 @@ private[spark] class TorrentBroadcast[T: ClassTag](
        * blocks from the driver and/or other executors.
        */
       @transient private var _value: T = obj
    +  /** The compression codec to use, or None if compression is disabled */
    +  @transient private var compressionCodec: Option[CompressionCodec] = _
    +  /** Size of each block. Default value is 4MB.  This value is only read by the broadcaster. */
    +  @transient private var blockSize: Int = _
    +
    +  private def setConf(conf: SparkConf) {
    +    compressionCodec = if (conf.getBoolean("spark.broadcast.compress", true)) {
    +      Some(CompressionCodec.createCodec(conf))
    +    } else {
    +      None
    +    }
    +    blockSize = conf.getInt("spark.broadcast.blockSize", 4096) * 1024
    +  }
    +  setConf(SparkEnv.get.conf)
    --- End diff --
    
    Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2844#discussion_r19063271
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -156,6 +158,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](
       private def readObject(in: ObjectInputStream) {
         in.defaultReadObject()
         TorrentBroadcast.synchronized {
    +      setConf(SparkEnv.get.conf)
    --- End diff --
    
    This looks wired, how can we make sure that this conf is equals to the one used when create the Broadcast?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2844#issuecomment-59645141
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21888/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by shivaram <gi...@git.apache.org>.
Github user shivaram commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2844#discussion_r19121626
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -104,29 +112,23 @@ private[spark] class TorrentBroadcast[T: ClassTag](
     
         for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
           val pieceId = BroadcastBlockId(id, "piece" + pid)
    -
    -      // First try getLocalBytes because  there is a chance that previous attempts to fetch the
    +      logDebug(s"Reading piece $pieceId of $broadcastId")
    +      // First try getLocalBytes because there is a chance that previous attempts to fetch the
           // broadcast blocks have already fetched some of the blocks. In that case, some blocks
           // would be available locally (on this executor).
    -      var blockOpt = bm.getLocalBytes(pieceId)
    -      if (!blockOpt.isDefined) {
    -        blockOpt = bm.getRemoteBytes(pieceId)
    -        blockOpt match {
    -          case Some(block) =>
    -            // If we found the block from remote executors/driver's BlockManager, put the block
    -            // in this executor's BlockManager.
    -            SparkEnv.get.blockManager.putBytes(
    -              pieceId,
    -              block,
    -              StorageLevel.MEMORY_AND_DISK_SER,
    -              tellMaster = true)
    -
    -          case None =>
    -            throw new SparkException("Failed to get " + pieceId + " of " + broadcastId)
    -        }
    +      val block: ByteBuffer = bm.getLocalBytes(pieceId).getOrElse {
    +        bm.getRemoteBytes(pieceId).map { block =>
    --- End diff --
    
    Hmm -- The thing I want the code to reflect is that there are three cases
    1. We get it locally
    2. If not, we get it from remote
    3. If that fails, we throw an exception. 
    
    Right now it looks like one big block instead of this three way switch


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2844#issuecomment-59642634
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21882/consoleFull) for   PR 2844 at commit [`618a872`](https://github.com/apache/spark/commit/618a87260faaebf353c1d9b4abc17af9f0cfa472).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2844#issuecomment-59655958
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21893/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2844#issuecomment-59642636
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21882/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2844#issuecomment-59641757
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21882/consoleFull) for   PR 2844 at commit [`618a872`](https://github.com/apache/spark/commit/618a87260faaebf353c1d9b4abc17af9f0cfa472).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2844#discussion_r19131009
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -227,6 +217,7 @@ private object TorrentBroadcast extends Logging {
        * If removeFromDriver is true, also remove these persisted blocks on the driver.
        */
       def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = {
    +    logInfo(s"Unpersisting TorrentBroadcast $id")
    --- End diff --
    
    I'll try to get #2851 merged this week; I'm in the middle of some significant UI code cleanup and I'm planning to merge most of the existing UI patches or to re-implement them myself.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2844#discussion_r19118588
  
    --- Diff: core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala ---
    @@ -17,13 +17,18 @@
     
     package org.apache.spark.broadcast
     
    +import scala.util.Random
    +
    +import org.scalacheck.Gen
    --- End diff --
    
    what does this do? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2844#discussion_r19063455
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -179,43 +183,29 @@ private[spark] class TorrentBroadcast[T: ClassTag](
     
     
     private object TorrentBroadcast extends Logging {
    -  /** Size of each block. Default value is 4MB. */
    -  private lazy val BLOCK_SIZE = conf.getInt("spark.broadcast.blockSize", 4096) * 1024
    -  private var initialized = false
    -  private var conf: SparkConf = null
    -  private var compress: Boolean = false
    -  private var compressionCodec: CompressionCodec = null
    -
    -  def initialize(_isDriver: Boolean, conf: SparkConf) {
    -    TorrentBroadcast.conf = conf // TODO: we might have to fix it in tests
    -    synchronized {
    -      if (!initialized) {
    -        compress = conf.getBoolean("spark.broadcast.compress", true)
    -        compressionCodec = CompressionCodec.createCodec(conf)
    -        initialized = true
    -      }
    -    }
    -  }
     
    -  def stop() {
    -    initialized = false
    -  }
    -
    -  def blockifyObject[T: ClassTag](obj: T): Array[ByteBuffer] = {
    -    val bos = new ByteArrayChunkOutputStream(BLOCK_SIZE)
    -    val out: OutputStream = if (compress) compressionCodec.compressedOutputStream(bos) else bos
    -    val ser = SparkEnv.get.serializer.newInstance()
    +  def blockifyObject[T: ClassTag](
    --- End diff --
    
    The conf has been moved into `class Broadcast`, maybe blockifyObject and unblockify also should be moved.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/2844#issuecomment-59643838
  
    It looks like this build is going to fail a ReplSuite test:
    
    ```scala
    test("broadcast vars") {
        // Test that the value that a broadcast var had when it was created is used,
        // even if that variable is then modified in the driver program
        // TODO: This doesn't actually work for arrays when we run in local mode!
        val output = runInterpreter("local",
          """
            |var array = new Array[Int](5)
            |val broadcastArray = sc.broadcast(array)
            |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
            |array(0) = 5
            |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
          """.stripMargin)
        assertDoesNotContain("error:", output)
        assertDoesNotContain("Exception", output)
        assertContains("res0: Array[Int] = Array(0, 0, 0, 0, 0)", output)
        assertContains("res2: Array[Int] = Array(5, 0, 0, 0, 0)", output)
      }
    ```
    
    I see now that my change to remove the special local-mode handling inadvertently leads to a duplication of the variable in the driver program.  This could maybe be a performance issue, since now we will use 2x the memory in the driver for each broadcast variable.  I'll restore the line that stores the local copy of the broadcast variable when it's created.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2844#discussion_r19118825
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -227,6 +217,7 @@ private object TorrentBroadcast extends Logging {
        * If removeFromDriver is true, also remove these persisted blocks on the driver.
        */
       def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = {
    +    logInfo(s"Unpersisting TorrentBroadcast $id")
    --- End diff --
    
    this can be chatty. logdebug?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2844#discussion_r19120243
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -104,29 +112,23 @@ private[spark] class TorrentBroadcast[T: ClassTag](
     
         for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
           val pieceId = BroadcastBlockId(id, "piece" + pid)
    -
    -      // First try getLocalBytes because  there is a chance that previous attempts to fetch the
    +      logDebug(s"Reading piece $pieceId of $broadcastId")
    +      // First try getLocalBytes because there is a chance that previous attempts to fetch the
           // broadcast blocks have already fetched some of the blocks. In that case, some blocks
           // would be available locally (on this executor).
    -      var blockOpt = bm.getLocalBytes(pieceId)
    -      if (!blockOpt.isDefined) {
    -        blockOpt = bm.getRemoteBytes(pieceId)
    -        blockOpt match {
    -          case Some(block) =>
    -            // If we found the block from remote executors/driver's BlockManager, put the block
    -            // in this executor's BlockManager.
    -            SparkEnv.get.blockManager.putBytes(
    -              pieceId,
    -              block,
    -              StorageLevel.MEMORY_AND_DISK_SER,
    -              tellMaster = true)
    -
    -          case None =>
    -            throw new SparkException("Failed to get " + pieceId + " of " + broadcastId)
    -        }
    +      val block: ByteBuffer = bm.getLocalBytes(pieceId).getOrElse {
    +        bm.getRemoteBytes(pieceId).map { block =>
    --- End diff --
    
    Would you like me to revert back to the old code layout then?  FWIW, I prefer the style here to the old code, which used a `var` and had this "if we get here, the option is defined" comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2844#issuecomment-59890235
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21974/consoleFull) for   PR 2844 at commit [`1e8268d`](https://github.com/apache/spark/commit/1e8268d6111e4ad45e2acfe47d837718f2170461).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/2844#issuecomment-59803868
  
    LGTM now, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2844#discussion_r19063363
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -62,6 +59,20 @@ private[spark] class TorrentBroadcast[T: ClassTag](
        * blocks from the driver and/or other executors.
        */
       @transient private var _value: T = obj
    +  /** The compression codec to use, or None if compression is disabled */
    +  @transient private var compressionCodec: Option[CompressionCodec] = _
    +  /** Size of each block. Default value is 4MB.  This value is only read by the broadcaster. */
    +  @transient private var blockSize: Int = _
    --- End diff --
    
    I thought about this and agree that it might be cleaner, but this will require more refactoring of other code.  One design goal here was to minimize the serialized size of TorrentBroadcast objects, so we can't serialize the SparkConf or CompressionCodec instances (which contain SparkConfs).  SparkEnv.conf determines these values anyways.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2844#discussion_r19126060
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -227,6 +217,7 @@ private object TorrentBroadcast extends Logging {
        * If removeFromDriver is true, also remove these persisted blocks on the driver.
        */
       def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = {
    +    logInfo(s"Unpersisting TorrentBroadcast $id")
    --- End diff --
    
    HttpBroadcast has info-level logging for this.  I'm going to leave this at info for now while we debug TorrentBroadcast issues; we can always revisit later as part of a larger log-level cleanup.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by shivaram <gi...@git.apache.org>.
Github user shivaram commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2844#discussion_r19119727
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -104,29 +112,23 @@ private[spark] class TorrentBroadcast[T: ClassTag](
     
         for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
           val pieceId = BroadcastBlockId(id, "piece" + pid)
    -
    -      // First try getLocalBytes because  there is a chance that previous attempts to fetch the
    +      logDebug(s"Reading piece $pieceId of $broadcastId")
    +      // First try getLocalBytes because there is a chance that previous attempts to fetch the
           // broadcast blocks have already fetched some of the blocks. In that case, some blocks
           // would be available locally (on this executor).
    -      var blockOpt = bm.getLocalBytes(pieceId)
    -      if (!blockOpt.isDefined) {
    -        blockOpt = bm.getRemoteBytes(pieceId)
    -        blockOpt match {
    -          case Some(block) =>
    -            // If we found the block from remote executors/driver's BlockManager, put the block
    -            // in this executor's BlockManager.
    -            SparkEnv.get.blockManager.putBytes(
    -              pieceId,
    -              block,
    -              StorageLevel.MEMORY_AND_DISK_SER,
    -              tellMaster = true)
    -
    -          case None =>
    -            throw new SparkException("Failed to get " + pieceId + " of " + broadcastId)
    -        }
    +      val block: ByteBuffer = bm.getLocalBytes(pieceId).getOrElse {
    +        bm.getRemoteBytes(pieceId).map { block =>
    --- End diff --
    
    FWIW I agree with @rxin 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3958] TorrentBroadcast cleanup / debugg...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2844#issuecomment-59644064
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21888/consoleFull) for   PR 2844 at commit [`5c22782`](https://github.com/apache/spark/commit/5c227825b3cf0bbe3826e20fe66370229bfc43a2).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org