You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by rxin <gi...@git.apache.org> on 2014/08/19 08:42:52 UTC

[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

GitHub user rxin opened a pull request:

    https://github.com/apache/spark/pull/2030

    [SPARK-3119] Re-implementation of TorrentBroadcast.

    This is a re-implementation of TorrentBroadcast, with the following changes:
    
    1. Removes most of the mutable, transient state from TorrentBroadcast (e.g. totalBytes, num of blocks fetched).
    2. Removes TorrentInfo and TorrentBlock
    3. Replaces the BlockManager.getSingle call in readObject with a getLocal, resuling in one less RPC call to the BlockManagerMasterActor to find the location of the block.
    4. Removes the metadata block, resulting in one less block to fetch.
    5. Removes an extra memory copy for deserialization (by using Java's SequenceInputStream).

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/rxin/spark torrentBroadcast

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/2030.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2030
    
----
commit c1185cded8224470f06abb4e345df5ebbd87db68
Author: Reynold Xin <rx...@apache.org>
Date:   2014-08-19T06:40:27Z

    [SPARK-3119] Re-implementation of TorrentBroadcast.
    
    This is a re-implementation of TorrentBroadcast, with the following changes:
    
    1. Removes most of the mutable, transient state from TorrentBroadcast (e.g. totalBytes, num of blocks fetched).
    2. Removes TorrentInfo and TorrentBlock
    3. Replaces the BlockManager.getSingle call in readObject with a getLocal, resuling in one less RPC call to the BlockManagerMasterActor to find the location of the block.
    4. Removes the metadata block, resulting in one less block to fetch.
    5. Removes an extra memory copy for deserialization (by using Java's SequenceInputStream).

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2030#discussion_r16434140
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -18,50 +18,116 @@
     package org.apache.spark.broadcast
     
     import java.io._
    +import java.nio.ByteBuffer
     
    +import scala.collection.JavaConversions.asJavaEnumeration
     import scala.reflect.ClassTag
     import scala.util.Random
     
     import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
     import org.apache.spark.io.CompressionCodec
     import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
    +import org.apache.spark.util.ByteBufferInputStream
     
     /**
    - *  A [[org.apache.spark.broadcast.Broadcast]] implementation that uses a BitTorrent-like
    - *  protocol to do a distributed transfer of the broadcasted data to the executors.
    - *  The mechanism is as follows. The driver divides the serializes the broadcasted data,
    - *  divides it into smaller chunks, and stores them in the BlockManager of the driver.
    - *  These chunks are reported to the BlockManagerMaster so that all the executors can
    - *  learn the location of those chunks. The first time the broadcast variable (sent as
    - *  part of task) is deserialized at a executor, all the chunks are fetched using
    - *  the BlockManager. When all the chunks are fetched (initially from the driver's
    - *  BlockManager), they are combined and deserialized to recreate the broadcasted data.
    - *  However, the chunks are also stored in the BlockManager and reported to the
    - *  BlockManagerMaster. As more executors fetch the chunks, BlockManagerMaster learns
    - *  multiple locations for each chunk. Hence, subsequent fetches of each chunk will be
    - *  made to other executors who already have those chunks, resulting in a distributed
    - *  fetching. This prevents the driver from being the bottleneck in sending out multiple
    - *  copies of the broadcast data (one per executor) as done by the
    - *  [[org.apache.spark.broadcast.HttpBroadcast]].
    + * A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]].
    + *
    + * The mechanism is as follows:
    + *
    + * The driver divides the serialized object into small chunks and
    + * stores those chunks in the BlockManager of the driver.
    + *
    + * On each executor, the executor first attempts to fetch the object from its BlockManager. If
    + * it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or
    + * other executors if available. Once it gets the chunks, it puts the chunks in its own
    + * BlockManager, ready for other executors to fetch from.
    + *
    + * This prevents the driver from being the bottleneck in sending out multiple copies of the
    + * broadcast data (one per executor) as done by the [[org.apache.spark.broadcast.HttpBroadcast]].
    + *
    + * @param obj object to broadcast
    + * @param isLocal whether Spark is running in local mode (single JVM process).
    + * @param id A unique identifier for the broadcast variable.
      */
     private[spark] class TorrentBroadcast[T: ClassTag](
    -    @transient var value_ : T, isLocal: Boolean, id: Long)
    +    obj : T,
    --- End diff --
    
    got it, i removed my comment when i noticed how it is used. thx


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2030#discussion_r16434184
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -18,50 +18,116 @@
     package org.apache.spark.broadcast
     
     import java.io._
    +import java.nio.ByteBuffer
     
    +import scala.collection.JavaConversions.asJavaEnumeration
     import scala.reflect.ClassTag
     import scala.util.Random
     
     import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
     import org.apache.spark.io.CompressionCodec
     import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
    +import org.apache.spark.util.ByteBufferInputStream
     
     /**
    - *  A [[org.apache.spark.broadcast.Broadcast]] implementation that uses a BitTorrent-like
    - *  protocol to do a distributed transfer of the broadcasted data to the executors.
    - *  The mechanism is as follows. The driver divides the serializes the broadcasted data,
    - *  divides it into smaller chunks, and stores them in the BlockManager of the driver.
    - *  These chunks are reported to the BlockManagerMaster so that all the executors can
    - *  learn the location of those chunks. The first time the broadcast variable (sent as
    - *  part of task) is deserialized at a executor, all the chunks are fetched using
    - *  the BlockManager. When all the chunks are fetched (initially from the driver's
    - *  BlockManager), they are combined and deserialized to recreate the broadcasted data.
    - *  However, the chunks are also stored in the BlockManager and reported to the
    - *  BlockManagerMaster. As more executors fetch the chunks, BlockManagerMaster learns
    - *  multiple locations for each chunk. Hence, subsequent fetches of each chunk will be
    - *  made to other executors who already have those chunks, resulting in a distributed
    - *  fetching. This prevents the driver from being the bottleneck in sending out multiple
    - *  copies of the broadcast data (one per executor) as done by the
    - *  [[org.apache.spark.broadcast.HttpBroadcast]].
    + * A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]].
    + *
    + * The mechanism is as follows:
    + *
    + * The driver divides the serialized object into small chunks and
    + * stores those chunks in the BlockManager of the driver.
    + *
    + * On each executor, the executor first attempts to fetch the object from its BlockManager. If
    + * it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or
    + * other executors if available. Once it gets the chunks, it puts the chunks in its own
    + * BlockManager, ready for other executors to fetch from.
    + *
    + * This prevents the driver from being the bottleneck in sending out multiple copies of the
    + * broadcast data (one per executor) as done by the [[org.apache.spark.broadcast.HttpBroadcast]].
    + *
    + * @param obj object to broadcast
    + * @param isLocal whether Spark is running in local mode (single JVM process).
    + * @param id A unique identifier for the broadcast variable.
      */
     private[spark] class TorrentBroadcast[T: ClassTag](
    -    @transient var value_ : T, isLocal: Boolean, id: Long)
    +    obj : T,
    +    @transient private val isLocal: Boolean,
    +    id: Long)
       extends Broadcast[T](id) with Logging with Serializable {
     
    -  override protected def getValue() = value_
    +  /**
    +   * Value of the broadcast object. On driver, this is set directly by the constructor.
    +   * On executors, this is reconstructed by [[readObject]], which builds this value by reading
    +   * blocks from the driver and/or other executors.
    +   */
    +  @transient private var _value: T = obj
    --- End diff --
    
    in general _ is just meh to use in variable names : causes unnecessary bugs _.var vs _var, etc.
    Unfortunately, I have seen it in a bunch of places in spark.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52604677
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18825/consoleFull) for   PR 2030 at commit [`2d6a5fb`](https://github.com/apache/spark/commit/2d6a5fb19119402254f277bdfaba373d32444612).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by dbtsai <gi...@git.apache.org>.
Github user dbtsai commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-58183559
  
    We had a build against the spark master on Oct 2, and when ran our application with data around 600GB, we got the following exception. Does this PR fix this issue which is seen by @JoshRosen
    
        Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 8312, ams03-002.ff): java.io.IOException: PARSING_ERROR(2)
            org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
            org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
            org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:594)
            org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:125)
            org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88)
            org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:58)
            org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128)
            org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1004)
            org.apache.spark.storage.ShuffleBlockFetcherIterator$$anon$1$$anonfun$onBlockFetchSuccess$1.apply(ShuffleBlockFetcherIterator.scala:116)
            org.apache.spark.storage.ShuffleBlockFetcherIterator$$anon$1$$anonfun$onBlockFetchSuccess$1.apply(ShuffleBlockFetcherIterator.scala:115)
            org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:243)
            org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:52)
            scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
            org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
            org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
            org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:89)
            org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44)
            org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
            org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
            org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
            org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
            org.apache.spark.scheduler.Task.run(Task.scala:56)
            org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
            java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
            java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
            java.lang.Thread.run(Thread.java:744)
    Driver stacktrace:



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by dbtsai <gi...@git.apache.org>.
Github user dbtsai commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-58214186
  
    I thought it was a close issue, so I moved my comment to JIRA. I ran into
    this issue in spark-shell not the standalone application, does SPARK-3762
    apply in this situation? Thanks.
    
    Sent from my Google Nexus 5
    On Oct 7, 2014 5:17 PM, "Davies Liu" <no...@github.com> wrote:
    
    > It could be fixed by https://github.com/apache/spark/pull/2624
    >
    > It's strange that I can not see this comment on PR #2030.
    >
    > On Tue, Oct 7, 2014 at 6:28 AM, DB Tsai <no...@github.com> wrote:
    >
    > > We had a build against the spark master on Oct 2, and when ran our
    > > application with data around 600GB, we got the following exception. Does
    > > this PR fix this issue which is seen by @JoshRosen
    > > <https://github.com/JoshRosen>
    > >
    > > Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times,
    > most recent failure: Lost task 0.3 in stage 6.0 (TID 8312, ams03-002.ff):
    > java.io.IOException: PARSING_ERROR(2)
    > > org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
    > > org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
    > > org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:594)
    > > org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:125)
    > > org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88)
    > > org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:58)
    > >
    > org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128)
    > >
    > org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1004)
    > >
    > org.apache.spark.storage.ShuffleBlockFetcherIterator$$anon$1$$anonfun$onBlockFetchSuccess$1.apply(ShuffleBlockFetcherIterator.scala:116)
    > >
    > org.apache.spark.storage.ShuffleBlockFetcherIterator$$anon$1$$anonfun$onBlockFetchSuccess$1.apply(ShuffleBlockFetcherIterator.scala:115)
    > >
    > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:243)
    > >
    > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:52)
    > > scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    > >
    > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
    > >
    > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    > > org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:89)
    > >
    > org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44)
    > > org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
    > > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    > > org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    > > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    > > org.apache.spark.scheduler.Task.run(Task.scala:56)
    > > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
    > >
    > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    > >
    > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    > > java.lang.Thread.run(Thread.java:744)
    > >
    > > Driver stacktrace:
    > >
    > > --
    > > Reply to this email directly or view it on GitHub
    > > <https://github.com/apache/spark/pull/2030#issuecomment-58183559>.
    > >
    >
    >
    >
    > --
    > - Davies
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/spark/pull/2030#issuecomment-58201237>.
    >


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52604743
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18833/consoleFull) for   PR 2030 at commit [`5bacb9d`](https://github.com/apache/spark/commit/5bacb9dbfab4ae5c83eb1874bd6fd6ae87ff4ad6).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2030#discussion_r16431974
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -18,50 +18,116 @@
     package org.apache.spark.broadcast
     
     import java.io._
    +import java.nio.ByteBuffer
     
    +import scala.collection.JavaConversions.asJavaEnumeration
     import scala.reflect.ClassTag
     import scala.util.Random
     
     import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
     import org.apache.spark.io.CompressionCodec
     import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
    +import org.apache.spark.util.ByteBufferInputStream
     
     /**
    - *  A [[org.apache.spark.broadcast.Broadcast]] implementation that uses a BitTorrent-like
    - *  protocol to do a distributed transfer of the broadcasted data to the executors.
    - *  The mechanism is as follows. The driver divides the serializes the broadcasted data,
    - *  divides it into smaller chunks, and stores them in the BlockManager of the driver.
    - *  These chunks are reported to the BlockManagerMaster so that all the executors can
    - *  learn the location of those chunks. The first time the broadcast variable (sent as
    - *  part of task) is deserialized at a executor, all the chunks are fetched using
    - *  the BlockManager. When all the chunks are fetched (initially from the driver's
    - *  BlockManager), they are combined and deserialized to recreate the broadcasted data.
    - *  However, the chunks are also stored in the BlockManager and reported to the
    - *  BlockManagerMaster. As more executors fetch the chunks, BlockManagerMaster learns
    - *  multiple locations for each chunk. Hence, subsequent fetches of each chunk will be
    - *  made to other executors who already have those chunks, resulting in a distributed
    - *  fetching. This prevents the driver from being the bottleneck in sending out multiple
    - *  copies of the broadcast data (one per executor) as done by the
    - *  [[org.apache.spark.broadcast.HttpBroadcast]].
    + * A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]].
    + *
    + * The mechanism is as follows:
    + *
    + * The driver divides the serialized object into small chunks and
    + * stores those chunks in the BlockManager of the driver.
    + *
    + * On each executor, the executor first attempts to fetch the object from its BlockManager. If
    + * it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or
    + * other executors if available. Once it gets the chunks, it puts the chunks in its own
    + * BlockManager, ready for other executors to fetch from.
    + *
    + * This prevents the driver from being the bottleneck in sending out multiple copies of the
    + * broadcast data (one per executor) as done by the [[org.apache.spark.broadcast.HttpBroadcast]].
    + *
    + * @param obj object to broadcast
    + * @param isLocal whether Spark is running in local mode (single JVM process).
    + * @param id A unique identifier for the broadcast variable.
      */
     private[spark] class TorrentBroadcast[T: ClassTag](
    -    @transient var value_ : T, isLocal: Boolean, id: Long)
    +    obj : T,
    --- End diff --
    
    make obj private ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by shivaram <gi...@git.apache.org>.
Github user shivaram commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52694150
  
    Thanks Josh -- Perf results look good. LGTM. Can you file JIRAs to track the multi-threading things for 1.2 ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52606590
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18827/consoleFull) for   PR 2030 at commit [`0d8ed5b`](https://github.com/apache/spark/commit/0d8ed5ba059ed530dc7e32ba946f92f827a4467f).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by shivaram <gi...@git.apache.org>.
Github user shivaram commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2030#discussion_r16399641
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -27,41 +29,87 @@ import org.apache.spark.io.CompressionCodec
     import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
     
     /**
    - *  A [[org.apache.spark.broadcast.Broadcast]] implementation that uses a BitTorrent-like
    - *  protocol to do a distributed transfer of the broadcasted data to the executors.
    - *  The mechanism is as follows. The driver divides the serializes the broadcasted data,
    - *  divides it into smaller chunks, and stores them in the BlockManager of the driver.
    - *  These chunks are reported to the BlockManagerMaster so that all the executors can
    - *  learn the location of those chunks. The first time the broadcast variable (sent as
    - *  part of task) is deserialized at a executor, all the chunks are fetched using
    - *  the BlockManager. When all the chunks are fetched (initially from the driver's
    - *  BlockManager), they are combined and deserialized to recreate the broadcasted data.
    - *  However, the chunks are also stored in the BlockManager and reported to the
    - *  BlockManagerMaster. As more executors fetch the chunks, BlockManagerMaster learns
    - *  multiple locations for each chunk. Hence, subsequent fetches of each chunk will be
    - *  made to other executors who already have those chunks, resulting in a distributed
    - *  fetching. This prevents the driver from being the bottleneck in sending out multiple
    - *  copies of the broadcast data (one per executor) as done by the
    - *  [[org.apache.spark.broadcast.HttpBroadcast]].
    + * A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]].
    + *
    + * The mechanism is as follows:
    + *
    + * The driver divides the serialized object into small chunks and
    + * stores those chunks in the BlockManager of the driver.
    + *
    + * On each executor, the executor first attempts to fetch the object from its BlockManager. If
    + * it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or
    + * other executors if available. Once it gets the chunks, it puts the chunks in its own
    + * BlockManager, ready for other executors to fetch from.
    + *
    + * This prevents the driver from being the bottleneck in sending out multiple copies of the
    + * broadcast data (one per executor) as done by the [[org.apache.spark.broadcast.HttpBroadcast]].
    + *
    + * @param obj object to broadcast
    + * @param isLocal whether Spark is running in local mode (single JVM process).
    + * @param id A unique identifier for the broadcast variable.
      */
     private[spark] class TorrentBroadcast[T: ClassTag](
    -    @transient var value_ : T, isLocal: Boolean, id: Long)
    +    obj : T,
    +    @transient private val isLocal: Boolean,
    +    id: Long)
       extends Broadcast[T](id) with Logging with Serializable {
     
    -  override protected def getValue() = value_
    +  override protected def getValue() = _value
    +
    +  /**
    +   * Value of the broadcast object. On driver, this is set directly by the constructor.
    +   * On executors, this is reconstructed by [[readObject]], which builds this value by reading
    +   * blocks from the driver and/or other executors.
    +   */
    +  @transient private var _value: T = obj
    +
    +  /** Total number of blocks this broadcast variable contains. */
    +  private val numBlocks: Int = writeBlocks()
     
       private val broadcastId = BroadcastBlockId(id)
     
    -  SparkEnv.get.blockManager.putSingle(
    -    broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
    +  /**
    +   * Divide the object into multiple blocks and put those blocks in the block manager.
    +   *
    +   * @return number of blocks this broadcast variable is divided into
    +   */
    +  private def writeBlocks(): Int = {
    +    val blocks = TorrentBroadcast.blockifyObject(_value)
    +    blocks.zipWithIndex.foreach { case (block, i) =>
    +      // TODO: Use putBytes directly.
    +      SparkEnv.get.blockManager.putSingle(
    +        BroadcastBlockId(id, "piece" + i),
    +        blocks(i),
    --- End diff --
    
    this should be `block` give we did a `zipWithIndex`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2030#discussion_r16399340
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -27,41 +29,87 @@ import org.apache.spark.io.CompressionCodec
     import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
     
     /**
    - *  A [[org.apache.spark.broadcast.Broadcast]] implementation that uses a BitTorrent-like
    - *  protocol to do a distributed transfer of the broadcasted data to the executors.
    - *  The mechanism is as follows. The driver divides the serializes the broadcasted data,
    - *  divides it into smaller chunks, and stores them in the BlockManager of the driver.
    - *  These chunks are reported to the BlockManagerMaster so that all the executors can
    - *  learn the location of those chunks. The first time the broadcast variable (sent as
    - *  part of task) is deserialized at a executor, all the chunks are fetched using
    - *  the BlockManager. When all the chunks are fetched (initially from the driver's
    - *  BlockManager), they are combined and deserialized to recreate the broadcasted data.
    - *  However, the chunks are also stored in the BlockManager and reported to the
    - *  BlockManagerMaster. As more executors fetch the chunks, BlockManagerMaster learns
    - *  multiple locations for each chunk. Hence, subsequent fetches of each chunk will be
    - *  made to other executors who already have those chunks, resulting in a distributed
    - *  fetching. This prevents the driver from being the bottleneck in sending out multiple
    - *  copies of the broadcast data (one per executor) as done by the
    - *  [[org.apache.spark.broadcast.HttpBroadcast]].
    + * A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]].
    + *
    + * The mechanism is as follows:
    + *
    + * The driver divides the serialized object into small chunks and
    + * stores those chunks in the BlockManager of the driver.
    + *
    + * On each executor, the executor first attempts to fetch the object from its BlockManager. If
    + * it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or
    + * other executors if available. Once it gets the chunks, it puts the chunks in its own
    + * BlockManager, ready for other executors to fetch from.
    + *
    + * This prevents the driver from being the bottleneck in sending out multiple copies of the
    + * broadcast data (one per executor) as done by the [[org.apache.spark.broadcast.HttpBroadcast]].
    + *
    + * @param obj object to broadcast
    + * @param isLocal whether Spark is running in local mode (single JVM process).
    + * @param id A unique identifier for the broadcast variable.
      */
     private[spark] class TorrentBroadcast[T: ClassTag](
    -    @transient var value_ : T, isLocal: Boolean, id: Long)
    +    obj : T,
    +    @transient private val isLocal: Boolean,
    +    id: Long)
       extends Broadcast[T](id) with Logging with Serializable {
     
    -  override protected def getValue() = value_
    +  override protected def getValue() = _value
    +
    +  /**
    +   * Value of the broadcast object. On driver, this is set directly by the constructor.
    +   * On executors, this is reconstructed by [[readObject]], which builds this value by reading
    +   * blocks from the driver and/or other executors.
    +   */
    +  @transient private var _value: T = obj
    +
    +  /** Total number of blocks this broadcast variable contains. */
    +  private val numBlocks: Int = writeBlocks()
     
       private val broadcastId = BroadcastBlockId(id)
     
    -  SparkEnv.get.blockManager.putSingle(
    -    broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
    +  /**
    +   * Divide the object into multiple blocks and put those blocks in the block manager.
    +   *
    +   * @return number of blocks this broadcast variable is divided into
    +   */
    +  private def writeBlocks(): Int = {
    +    val blocks = TorrentBroadcast.blockifyObject(_value)
    +    blocks.zipWithIndex.foreach { case (block, i) =>
    +      // TODO: Use putBytes directly.
    +      SparkEnv.get.blockManager.putSingle(
    +        BroadcastBlockId(id, "piece" + i),
    +        blocks(i),
    +        StorageLevel.MEMORY_AND_DISK_SER,
    +        tellMaster = true)
    +    }
    +    blocks.length
    +  }
     
    -  @transient private var arrayOfBlocks: Array[TorrentBlock] = null
    -  @transient private var totalBlocks = -1
    -  @transient private var totalBytes = -1
    -  @transient private var hasBlocks = 0
    +  /** Fetch torrent blocks from the driver and/or other executors. */
    +  private def readBlocks(): Array[Array[Byte]] = {
    +    // Fetch chunks of data. Note that all these chunks are stored in the BlockManager and reported
    +    // to the driver, so other executors can pull these thunks from this executor as well.
    --- End diff --
    
    Typo: thunks -> chunks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2030#discussion_r16433678
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -109,99 +155,30 @@ private[spark] class TorrentBroadcast[T: ClassTag](
       private def readObject(in: ObjectInputStream) {
         in.defaultReadObject()
         TorrentBroadcast.synchronized {
    --- End diff --
    
    Cant we not relax the synchronized to just around local block manager interactions ?
    Or is there a reason to keep it globally synchronized ?
    
    Multiple cores per container all would block on this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52693773
  
    Currently running more jobs from the spark-perf suite against 5bacb9dbfab4ae5c83eb1874bd6fd6ae87ff4ad6 (the latest commit) as a stress-test to gain more confidence that this is bug-free.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2030#discussion_r16434413
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -109,99 +155,30 @@ private[spark] class TorrentBroadcast[T: ClassTag](
       private def readObject(in: ObjectInputStream) {
         in.defaultReadObject()
         TorrentBroadcast.synchronized {
    --- End diff --
    
    How 'bad' is it if we allow multiple cores to fetch the same block ? Particularly given that we shuffle the block fetch now (which is really neat change btw !)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52681776
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18857/consoleFull) for   PR 2030 at commit [`5bacb9d`](https://github.com/apache/spark/commit/5bacb9dbfab4ae5c83eb1874bd6fd6ae87ff4ad6).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52663705
  
    Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2030#discussion_r16435835
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -109,99 +155,30 @@ private[spark] class TorrentBroadcast[T: ClassTag](
       private def readObject(in: ObjectInputStream) {
         in.defaultReadObject()
         TorrentBroadcast.synchronized {
    --- End diff --
    
    Note, I am only referring to case where none of the blocks are availabe locally (first fetch of one or more broadcasts).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52693339
  
    New results (testing against 82577339dd58b5811eab5d10667775e61e37ff51, 1f1819b20f887b487557c31e54b8bcd95b582dc6, 1.0.2, and this):
    
     
    ![image](https://cloud.githubusercontent.com/assets/50748/3972582/df7adaac-27de-11e4-9642-d51fdddea6de.png)
    
    Means:
    
    ![image](https://cloud.githubusercontent.com/assets/50748/3972585/e90cb144-27de-11e4-8972-732f841c4292.png)
    
    Standard deviations:
    
    ![image](https://cloud.githubusercontent.com/assets/50748/3972594/f8983a8e-27de-11e4-855d-cc750f0178e1.png)
    
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52673128
  
    Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2030#discussion_r16461557
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -109,99 +155,30 @@ private[spark] class TorrentBroadcast[T: ClassTag](
       private def readObject(in: ObjectInputStream) {
         in.defaultReadObject()
         TorrentBroadcast.synchronized {
    --- End diff --
    
    Isn't this logic almost exactly what the CacheManager does already, fine grained locking on reconstructing specific blocks?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by shivaram <gi...@git.apache.org>.
Github user shivaram commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52807817
  
    @aarondav The table and graph in https://github.com/apache/spark/pull/2030#issuecomment-52693339 compares pre-PR to post-PR. Actually it breaks it down into three runs: pre-PR, after 1st PR (#2028) and after 2nd PR (#2030). 
    
    For large closures this PR had little to no effect.  For smaller closures we see an improvement from 19.7ms before PR to 17.9 after 1st PR to 13.6 after the 2nd PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52598142
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18819/consoleFull) for   PR 2030 at commit [`3670f00`](https://github.com/apache/spark/commit/3670f002248faabf6d1b613204bb23c10454d9e6).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2030#discussion_r16434120
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -18,50 +18,116 @@
     package org.apache.spark.broadcast
     
     import java.io._
    +import java.nio.ByteBuffer
     
    +import scala.collection.JavaConversions.asJavaEnumeration
     import scala.reflect.ClassTag
     import scala.util.Random
     
     import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
     import org.apache.spark.io.CompressionCodec
     import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
    +import org.apache.spark.util.ByteBufferInputStream
     
     /**
    - *  A [[org.apache.spark.broadcast.Broadcast]] implementation that uses a BitTorrent-like
    - *  protocol to do a distributed transfer of the broadcasted data to the executors.
    - *  The mechanism is as follows. The driver divides the serializes the broadcasted data,
    - *  divides it into smaller chunks, and stores them in the BlockManager of the driver.
    - *  These chunks are reported to the BlockManagerMaster so that all the executors can
    - *  learn the location of those chunks. The first time the broadcast variable (sent as
    - *  part of task) is deserialized at a executor, all the chunks are fetched using
    - *  the BlockManager. When all the chunks are fetched (initially from the driver's
    - *  BlockManager), they are combined and deserialized to recreate the broadcasted data.
    - *  However, the chunks are also stored in the BlockManager and reported to the
    - *  BlockManagerMaster. As more executors fetch the chunks, BlockManagerMaster learns
    - *  multiple locations for each chunk. Hence, subsequent fetches of each chunk will be
    - *  made to other executors who already have those chunks, resulting in a distributed
    - *  fetching. This prevents the driver from being the bottleneck in sending out multiple
    - *  copies of the broadcast data (one per executor) as done by the
    - *  [[org.apache.spark.broadcast.HttpBroadcast]].
    + * A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]].
    + *
    + * The mechanism is as follows:
    + *
    + * The driver divides the serialized object into small chunks and
    + * stores those chunks in the BlockManager of the driver.
    + *
    + * On each executor, the executor first attempts to fetch the object from its BlockManager. If
    + * it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or
    + * other executors if available. Once it gets the chunks, it puts the chunks in its own
    + * BlockManager, ready for other executors to fetch from.
    + *
    + * This prevents the driver from being the bottleneck in sending out multiple copies of the
    + * broadcast data (one per executor) as done by the [[org.apache.spark.broadcast.HttpBroadcast]].
    --- End diff --
    
    Hmm, our broadcast variables are in the 1mb - 3 mb range : so does not qualify I guess ... thx for the comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52665346
  
    @shivaram I'm actually going to re-run these tests this morning after restarting my cluster.  I'll test before and after #2028 and after this commit.  I can also test with a wider range of configurations if that would help to figure out what's going on.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52742011
  
    @JoshRosen Did you ever see a significant difference between pre-PR and post-PR numbers? 1.0.2 was always much worse in all your graphs, but was this PR significant for larger blocks?
    
    By the way, 24 hour turnarounds on critical PRs that are going in at the end of a release cycle seem like a really bad way to preserve stability.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2030#discussion_r16437861
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -109,99 +155,30 @@ private[spark] class TorrentBroadcast[T: ClassTag](
       private def readObject(in: ObjectInputStream) {
         in.defaultReadObject()
         TorrentBroadcast.synchronized {
    --- End diff --
    
    We can explore this in a later PR in case it is not possible to do it right now - if this change is targetted for 1.1; since it is obviously much better than what existed before.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2030#discussion_r16434310
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -109,99 +155,30 @@ private[spark] class TorrentBroadcast[T: ClassTag](
       private def readObject(in: ObjectInputStream) {
         in.defaultReadObject()
         TorrentBroadcast.synchronized {
    --- End diff --
    
    The only thing we can improve here is to block only tasks fetching the same block. That'd help actually but requires more complexity. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52597491
  
    Ran into some task failures when testing this commit on EC2 with the SchedulerThroughputTest:
    
    ```
    14/08/19 07:01:24 WARN scheduler.TaskSetManager: Lost task 5.0 in stage 995.0 (TID 9974, ip-172-31-24-16.us-west-2.compute.internal): java.io.IOException: FAILED_TO_UNCOMPRESS(5)
            org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
            org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
            org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444)
            org.xerial.snappy.Snappy.uncompress(Snappy.java:480)
            org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:127)
            org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88)
            org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:58)
            org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:127)
            org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:214)
            org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:151)
            sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
            sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            java.lang.reflect.Method.invoke(Method.java:606)
            java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
            java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
            java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
            java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
            java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
            java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
            java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
            java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
            java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
            org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
            org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
            org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:184)
            java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
            java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
            java.lang.Thread.run(Thread.java:745)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52742166
  
    FYI we actually reran all the mllib and scheduler throughput tests for this PR to verify.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52610271
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18833/consoleFull) for   PR 2030 at commit [`5bacb9d`](https://github.com/apache/spark/commit/5bacb9dbfab4ae5c83eb1874bd6fd6ae87ff4ad6).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52677770
  
    @rxin I think we can only benefit from broadcast an rdd when the closure is big enough, such as more than 1M bytes. But his most cases, the closure shoud be less than 10k. How about we turn on broadcasting only when the size of closure is big enough? This may make the code a little bit complicated, but may be make it faster in most cases (small closure).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2030#discussion_r16432099
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -18,50 +18,116 @@
     package org.apache.spark.broadcast
     
     import java.io._
    +import java.nio.ByteBuffer
     
    +import scala.collection.JavaConversions.asJavaEnumeration
     import scala.reflect.ClassTag
     import scala.util.Random
     
     import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
     import org.apache.spark.io.CompressionCodec
     import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
    +import org.apache.spark.util.ByteBufferInputStream
     
     /**
    - *  A [[org.apache.spark.broadcast.Broadcast]] implementation that uses a BitTorrent-like
    - *  protocol to do a distributed transfer of the broadcasted data to the executors.
    - *  The mechanism is as follows. The driver divides the serializes the broadcasted data,
    - *  divides it into smaller chunks, and stores them in the BlockManager of the driver.
    - *  These chunks are reported to the BlockManagerMaster so that all the executors can
    - *  learn the location of those chunks. The first time the broadcast variable (sent as
    - *  part of task) is deserialized at a executor, all the chunks are fetched using
    - *  the BlockManager. When all the chunks are fetched (initially from the driver's
    - *  BlockManager), they are combined and deserialized to recreate the broadcasted data.
    - *  However, the chunks are also stored in the BlockManager and reported to the
    - *  BlockManagerMaster. As more executors fetch the chunks, BlockManagerMaster learns
    - *  multiple locations for each chunk. Hence, subsequent fetches of each chunk will be
    - *  made to other executors who already have those chunks, resulting in a distributed
    - *  fetching. This prevents the driver from being the bottleneck in sending out multiple
    - *  copies of the broadcast data (one per executor) as done by the
    - *  [[org.apache.spark.broadcast.HttpBroadcast]].
    + * A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]].
    + *
    + * The mechanism is as follows:
    + *
    + * The driver divides the serialized object into small chunks and
    + * stores those chunks in the BlockManager of the driver.
    + *
    + * On each executor, the executor first attempts to fetch the object from its BlockManager. If
    + * it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or
    + * other executors if available. Once it gets the chunks, it puts the chunks in its own
    + * BlockManager, ready for other executors to fetch from.
    + *
    + * This prevents the driver from being the bottleneck in sending out multiple copies of the
    + * broadcast data (one per executor) as done by the [[org.apache.spark.broadcast.HttpBroadcast]].
    --- End diff --
    
    try a larger variable, like a 10MB or 100MB model.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2030#discussion_r16435754
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -109,99 +155,30 @@ private[spark] class TorrentBroadcast[T: ClassTag](
       private def readObject(in: ObjectInputStream) {
         in.defaultReadObject()
         TorrentBroadcast.synchronized {
    --- End diff --
    
    block size is limited to 4mb iirc - while deserializing two 100mb broadcasts (as mentioned in usecase) in parallel will mean first has to complete before second (or any other) can even start : so 50 requests have to finish (half of which are pulling 4 mb each).
    Even assuming both cores are pulling same broadcast, assuming reasonable shuffle and large enough sizes, they could complement each other (best case, reduce time taken by half, worst case, double IO assuming horrible shuffle or single block).
    From latency point of view, current option is not very good when waves of tasks are getting scheduled and we have multiple cores per executors : from network IO incurred point of view, maybe it is better.
    Tradeoff question is, which is dominating.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52600436
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18825/consoleFull) for   PR 2030 at commit [`2d6a5fb`](https://github.com/apache/spark/commit/2d6a5fb19119402254f277bdfaba373d32444612).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52596008
  
    cc @shivaram @mosharaf 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2030#discussion_r16461721
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -109,99 +155,30 @@ private[spark] class TorrentBroadcast[T: ClassTag](
       private def readObject(in: ObjectInputStream) {
         in.defaultReadObject()
         TorrentBroadcast.synchronized {
    --- End diff --
    
    maybe we can generalize that to provide a lock manager, similar to what dbms would do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by shivaram <gi...@git.apache.org>.
Github user shivaram commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52664672
  
    @JoshRosen  Thanks for testing -- The perf results are a bit surprising (especially that master branch became faster since the earlier one). I also realized we do 3 RPCs, as after fetching the block we report back to the master. We could see if this reporting can be made asynchronous in the future 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52734661
  
    Ok merging in master & branch-1.1.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52664118
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18851/consoleFull) for   PR 2030 at commit [`5bacb9d`](https://github.com/apache/spark/commit/5bacb9dbfab4ae5c83eb1874bd6fd6ae87ff4ad6).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2030#discussion_r16432796
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -18,50 +18,116 @@
     package org.apache.spark.broadcast
     
     import java.io._
    +import java.nio.ByteBuffer
     
    +import scala.collection.JavaConversions.asJavaEnumeration
     import scala.reflect.ClassTag
     import scala.util.Random
     
     import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
     import org.apache.spark.io.CompressionCodec
     import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
    +import org.apache.spark.util.ByteBufferInputStream
     
     /**
    - *  A [[org.apache.spark.broadcast.Broadcast]] implementation that uses a BitTorrent-like
    - *  protocol to do a distributed transfer of the broadcasted data to the executors.
    - *  The mechanism is as follows. The driver divides the serializes the broadcasted data,
    - *  divides it into smaller chunks, and stores them in the BlockManager of the driver.
    - *  These chunks are reported to the BlockManagerMaster so that all the executors can
    - *  learn the location of those chunks. The first time the broadcast variable (sent as
    - *  part of task) is deserialized at a executor, all the chunks are fetched using
    - *  the BlockManager. When all the chunks are fetched (initially from the driver's
    - *  BlockManager), they are combined and deserialized to recreate the broadcasted data.
    - *  However, the chunks are also stored in the BlockManager and reported to the
    - *  BlockManagerMaster. As more executors fetch the chunks, BlockManagerMaster learns
    - *  multiple locations for each chunk. Hence, subsequent fetches of each chunk will be
    - *  made to other executors who already have those chunks, resulting in a distributed
    - *  fetching. This prevents the driver from being the bottleneck in sending out multiple
    - *  copies of the broadcast data (one per executor) as done by the
    - *  [[org.apache.spark.broadcast.HttpBroadcast]].
    + * A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]].
    + *
    + * The mechanism is as follows:
    + *
    + * The driver divides the serialized object into small chunks and
    + * stores those chunks in the BlockManager of the driver.
    + *
    + * On each executor, the executor first attempts to fetch the object from its BlockManager. If
    + * it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or
    + * other executors if available. Once it gets the chunks, it puts the chunks in its own
    + * BlockManager, ready for other executors to fetch from.
    + *
    + * This prevents the driver from being the bottleneck in sending out multiple copies of the
    + * broadcast data (one per executor) as done by the [[org.apache.spark.broadcast.HttpBroadcast]].
    + *
    + * @param obj object to broadcast
    + * @param isLocal whether Spark is running in local mode (single JVM process).
    + * @param id A unique identifier for the broadcast variable.
      */
     private[spark] class TorrentBroadcast[T: ClassTag](
    -    @transient var value_ : T, isLocal: Boolean, id: Long)
    +    obj : T,
    --- End diff --
    
    no obj is not a field. it is just a ctor argument


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52675858
  
    I tested this with a local cluster (which might not be a representative experiment), but this commit makes dummy short tasks finish in about 1/4 of the time. 
    
    ```
    rxin @ rxin-mbp : /scratch/rxin/spark-1 (torrentBroadcast) 
    > grep "took" new.txt 
    14/08/19 11:06:56 INFO SparkContext: Job finished: count at <console>:13, took 0.169695 s
    14/08/19 11:06:58 INFO SparkContext: Job finished: count at <console>:13, took 0.010982 s
    14/08/19 11:06:59 INFO SparkContext: Job finished: count at <console>:13, took 0.010995 s
    14/08/19 11:07:00 INFO SparkContext: Job finished: count at <console>:13, took 0.009852 s
    14/08/19 11:07:01 INFO SparkContext: Job finished: count at <console>:13, took 0.007796 s
    14/08/19 11:07:02 INFO SparkContext: Job finished: count at <console>:13, took 0.009224 s
    14/08/19 11:07:03 INFO SparkContext: Job finished: count at <console>:13, took 0.007944 s
    14/08/19 11:07:03 INFO SparkContext: Job finished: count at <console>:13, took 0.011442 s
    14/08/19 11:07:04 INFO SparkContext: Job finished: count at <console>:13, took 0.007945 s
    14/08/19 11:07:05 INFO SparkContext: Job finished: count at <console>:13, took 0.008433 s
    14/08/19 11:07:06 INFO SparkContext: Job finished: count at <console>:13, took 0.008017 s
    14/08/19 11:07:06 INFO SparkContext: Job finished: count at <console>:13, took 0.0085 s
    14/08/19 11:07:07 INFO SparkContext: Job finished: count at <console>:13, took 0.008712 s
    14/08/19 11:07:08 INFO SparkContext: Job finished: count at <console>:13, took 0.008481 s
    14/08/19 11:07:09 INFO SparkContext: Job finished: count at <console>:13, took 0.007848 s
    14/08/19 11:07:10 INFO SparkContext: Job finished: count at <console>:13, took 0.008487 s
    14/08/19 11:07:10 INFO SparkContext: Job finished: count at <console>:13, took 0.00756 s
    14/08/19 11:07:11 INFO SparkContext: Job finished: count at <console>:13, took 0.008702 s
    14/08/19 11:07:12 INFO SparkContext: Job finished: count at <console>:13, took 0.007459 s
    14/08/19 11:07:13 INFO SparkContext: Job finished: count at <console>:13, took 0.007884 s
    14/08/19 11:07:14 INFO SparkContext: Job finished: count at <console>:13, took 0.007632 s
    ```
    
    ```
    rxin @ rxin-mbp : /scratch/rxin/spark-1 (torrentBroadcast) 
    > grep "took" old.txt 
    14/08/19 11:12:12 INFO SparkContext: Job finished: count at <console>:13, took 0.316267 s
    14/08/19 11:12:14 INFO SparkContext: Job finished: count at <console>:13, took 0.035347 s
    14/08/19 11:12:15 INFO SparkContext: Job finished: count at <console>:13, took 0.038807 s
    14/08/19 11:12:16 INFO SparkContext: Job finished: count at <console>:13, took 0.071991 s
    14/08/19 11:12:16 INFO SparkContext: Job finished: count at <console>:13, took 0.036433 s
    14/08/19 11:12:17 INFO SparkContext: Job finished: count at <console>:13, took 0.036449 s
    14/08/19 11:12:29 INFO SparkContext: Job finished: count at <console>:13, took 0.076298 s
    14/08/19 11:12:29 INFO SparkContext: Job finished: count at <console>:13, took 0.033312 s
    14/08/19 11:12:30 INFO SparkContext: Job finished: count at <console>:13, took 0.036079 s
    14/08/19 11:12:31 INFO SparkContext: Job finished: count at <console>:13, took 0.033334 s
    14/08/19 11:13:15 INFO SparkContext: Job finished: count at <console>:13, took 0.47008 s
    14/08/19 11:13:17 INFO SparkContext: Job finished: count at <console>:13, took 0.033396 s
    14/08/19 11:13:18 INFO SparkContext: Job finished: count at <console>:13, took 0.036782 s
    14/08/19 11:13:19 INFO SparkContext: Job finished: count at <console>:13, took 0.033261 s
    14/08/19 11:13:19 INFO SparkContext: Job finished: count at <console>:13, took 0.032565 s
    14/08/19 11:13:20 INFO SparkContext: Job finished: count at <console>:13, took 0.033861 s
    14/08/19 11:13:54 INFO SparkContext: Job finished: count at <console>:13, took 0.077148 s
    14/08/19 11:14:03 INFO SparkContext: Job finished: count at <console>:13, took 0.031572 s
    ```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52599404
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18816/consoleFull) for   PR 2030 at commit [`c1185cd`](https://github.com/apache/spark/commit/c1185cded8224470f06abb4e345df5ebbd87db68).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-58201237
  
    It could be fixed by https://github.com/apache/spark/pull/2624
    
    It's strange that I can not see this comment on PR #2030.
    
    On Tue, Oct 7, 2014 at 6:28 AM, DB Tsai <no...@github.com> wrote:
    
    > We had a build against the spark master on Oct 2, and when ran our
    > application with data around 600GB, we got the following exception. Does
    > this PR fix this issue which is seen by @JoshRosen
    > <https://github.com/JoshRosen>
    >
    > Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 8312, ams03-002.ff): java.io.IOException: PARSING_ERROR(2)
    >     org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
    >     org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
    >     org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:594)
    >     org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:125)
    >     org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88)
    >     org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:58)
    >     org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128)
    >     org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1004)
    >     org.apache.spark.storage.ShuffleBlockFetcherIterator$$anon$1$$anonfun$onBlockFetchSuccess$1.apply(ShuffleBlockFetcherIterator.scala:116)
    >     org.apache.spark.storage.ShuffleBlockFetcherIterator$$anon$1$$anonfun$onBlockFetchSuccess$1.apply(ShuffleBlockFetcherIterator.scala:115)
    >     org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:243)
    >     org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:52)
    >     scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    >     org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
    >     org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    >     org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:89)
    >     org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44)
    >     org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
    >     org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    >     org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    >     org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    >     org.apache.spark.scheduler.Task.run(Task.scala:56)
    >     org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
    >     java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    >     java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    >     java.lang.Thread.run(Thread.java:744)
    >
    > Driver stacktrace:
    >
    > --
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/spark/pull/2030#issuecomment-58183559>.
    >
    
    
    
    -- 
     - Davies


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52671331
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18851/consoleFull) for   PR 2030 at commit [`5bacb9d`](https://github.com/apache/spark/commit/5bacb9dbfab4ae5c83eb1874bd6fd6ae87ff4ad6).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52673112
  
    Ok the test failures this time are just because of flaky Python tests....


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by shivaram <gi...@git.apache.org>.
Github user shivaram commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2030#discussion_r16433991
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -109,99 +155,30 @@ private[spark] class TorrentBroadcast[T: ClassTag](
       private def readObject(in: ObjectInputStream) {
         in.defaultReadObject()
         TorrentBroadcast.synchronized {
    --- End diff --
    
    I am not sure there is a lot of benefit from having multiple cores execute `readObject`. We'll then need synchronization for co-ordinating which piece is being fetched by each thread and cap the number of total outstanding requests etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/2030


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2030#discussion_r16434246
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -109,99 +155,30 @@ private[spark] class TorrentBroadcast[T: ClassTag](
       private def readObject(in: ObjectInputStream) {
         in.defaultReadObject()
         TorrentBroadcast.synchronized {
    --- End diff --
    
    The lock is not on a specific instance - but on global class if I am not wrong.
    A single slow readObject will block all other cores waiting for the lock to be released.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2030#discussion_r16438557
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -109,99 +155,30 @@ private[spark] class TorrentBroadcast[T: ClassTag](
       private def readObject(in: ObjectInputStream) {
         in.defaultReadObject()
         TorrentBroadcast.synchronized {
    --- End diff --
    
    yup agree!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2030#discussion_r16432379
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -18,50 +18,116 @@
     package org.apache.spark.broadcast
     
     import java.io._
    +import java.nio.ByteBuffer
     
    +import scala.collection.JavaConversions.asJavaEnumeration
     import scala.reflect.ClassTag
     import scala.util.Random
     
     import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
     import org.apache.spark.io.CompressionCodec
     import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
    +import org.apache.spark.util.ByteBufferInputStream
     
     /**
    - *  A [[org.apache.spark.broadcast.Broadcast]] implementation that uses a BitTorrent-like
    - *  protocol to do a distributed transfer of the broadcasted data to the executors.
    - *  The mechanism is as follows. The driver divides the serializes the broadcasted data,
    - *  divides it into smaller chunks, and stores them in the BlockManager of the driver.
    - *  These chunks are reported to the BlockManagerMaster so that all the executors can
    - *  learn the location of those chunks. The first time the broadcast variable (sent as
    - *  part of task) is deserialized at a executor, all the chunks are fetched using
    - *  the BlockManager. When all the chunks are fetched (initially from the driver's
    - *  BlockManager), they are combined and deserialized to recreate the broadcasted data.
    - *  However, the chunks are also stored in the BlockManager and reported to the
    - *  BlockManagerMaster. As more executors fetch the chunks, BlockManagerMaster learns
    - *  multiple locations for each chunk. Hence, subsequent fetches of each chunk will be
    - *  made to other executors who already have those chunks, resulting in a distributed
    - *  fetching. This prevents the driver from being the bottleneck in sending out multiple
    - *  copies of the broadcast data (one per executor) as done by the
    - *  [[org.apache.spark.broadcast.HttpBroadcast]].
    + * A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]].
    + *
    + * The mechanism is as follows:
    + *
    + * The driver divides the serialized object into small chunks and
    + * stores those chunks in the BlockManager of the driver.
    + *
    + * On each executor, the executor first attempts to fetch the object from its BlockManager. If
    + * it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or
    + * other executors if available. Once it gets the chunks, it puts the chunks in its own
    + * BlockManager, ready for other executors to fetch from.
    + *
    + * This prevents the driver from being the bottleneck in sending out multiple copies of the
    + * broadcast data (one per executor) as done by the [[org.apache.spark.broadcast.HttpBroadcast]].
    + *
    + * @param obj object to broadcast
    + * @param isLocal whether Spark is running in local mode (single JVM process).
    + * @param id A unique identifier for the broadcast variable.
      */
     private[spark] class TorrentBroadcast[T: ClassTag](
    -    @transient var value_ : T, isLocal: Boolean, id: Long)
    +    obj : T,
    --- End diff --
    
    and transient ? Not sure if I am missing something here ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2030#discussion_r16431857
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -18,50 +18,116 @@
     package org.apache.spark.broadcast
     
     import java.io._
    +import java.nio.ByteBuffer
     
    +import scala.collection.JavaConversions.asJavaEnumeration
     import scala.reflect.ClassTag
     import scala.util.Random
     
     import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
     import org.apache.spark.io.CompressionCodec
     import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
    +import org.apache.spark.util.ByteBufferInputStream
     
     /**
    - *  A [[org.apache.spark.broadcast.Broadcast]] implementation that uses a BitTorrent-like
    - *  protocol to do a distributed transfer of the broadcasted data to the executors.
    - *  The mechanism is as follows. The driver divides the serializes the broadcasted data,
    - *  divides it into smaller chunks, and stores them in the BlockManager of the driver.
    - *  These chunks are reported to the BlockManagerMaster so that all the executors can
    - *  learn the location of those chunks. The first time the broadcast variable (sent as
    - *  part of task) is deserialized at a executor, all the chunks are fetched using
    - *  the BlockManager. When all the chunks are fetched (initially from the driver's
    - *  BlockManager), they are combined and deserialized to recreate the broadcasted data.
    - *  However, the chunks are also stored in the BlockManager and reported to the
    - *  BlockManagerMaster. As more executors fetch the chunks, BlockManagerMaster learns
    - *  multiple locations for each chunk. Hence, subsequent fetches of each chunk will be
    - *  made to other executors who already have those chunks, resulting in a distributed
    - *  fetching. This prevents the driver from being the bottleneck in sending out multiple
    - *  copies of the broadcast data (one per executor) as done by the
    - *  [[org.apache.spark.broadcast.HttpBroadcast]].
    + * A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]].
    + *
    + * The mechanism is as follows:
    + *
    + * The driver divides the serialized object into small chunks and
    + * stores those chunks in the BlockManager of the driver.
    + *
    + * On each executor, the executor first attempts to fetch the object from its BlockManager. If
    + * it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or
    + * other executors if available. Once it gets the chunks, it puts the chunks in its own
    + * BlockManager, ready for other executors to fetch from.
    + *
    + * This prevents the driver from being the bottleneck in sending out multiple copies of the
    + * broadcast data (one per executor) as done by the [[org.apache.spark.broadcast.HttpBroadcast]].
    --- End diff --
    
    Just curious, have we observed master becoming bottleneck for http broadcast ?
    We have run with about 600 executor cores concurrently and have not seen this (the tasks themselves finish in sub seconds) - though it is probably a small enough sample set.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52602155
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18827/consoleFull) for   PR 2030 at commit [`0d8ed5b`](https://github.com/apache/spark/commit/0d8ed5ba059ed530dc7e32ba946f92f827a4467f).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52606422
  
    Benchmarked as of 0d8ed5b and the results aren't conclusively faster than `master`; the good news is that we've narrowed the gap that I saw earlier between `master` and v1.0.2 for small jobs.
    
    Each bar here represents a test where I ran 100 back-to-back jobs, each with 10 tasks, and varied the size of the task’s closure (each bar is the average of 10 runs, ignoring the first run to allow for JIT / warmup).  The closure sizes (x-axis) are empty (well, whatever the minimum size was), 1 megabyte, and 10 megabytes; y-axis is time (seconds).  This is running on 10 r3.2xlarge nodes in EC2.  The test code is based off of my modified version of spark-perf (https://github.com/JoshRosen/spark-perf/commit/0e768b2e03bfb3eeb421397e6e0fe93082879ef8)
    
    ![image](https://cloud.githubusercontent.com/assets/50748/3963508/3cc0d430-277d-11e4-9109-9efd98a8b30e.png)
    
    Or, in tabular form, the means:
    
    ![image](https://cloud.githubusercontent.com/assets/50748/3963514/5566af50-277d-11e4-9214-08ea57058c2d.png)
    
    and standard deviations:
    
    ![image](https://cloud.githubusercontent.com/assets/50748/3963521/74a6dc82-277d-11e4-9a85-e3f5c1597242.png)
    
    Keep in mind that this is running 100 back-to-back jobs; for example, v1.0.2 averaged 9ms per job for the small jobs.
    
    I'll run these benchmarks again tomorrow morning when I'm less tired to make sure I haven't inadvertently misconfigured anything.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52678684
  
    Actually with this change my local cluster testing didn't see much difference between torrent and http.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2030#discussion_r16432102
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -18,50 +18,116 @@
     package org.apache.spark.broadcast
     
     import java.io._
    +import java.nio.ByteBuffer
     
    +import scala.collection.JavaConversions.asJavaEnumeration
     import scala.reflect.ClassTag
     import scala.util.Random
     
     import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
     import org.apache.spark.io.CompressionCodec
     import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
    +import org.apache.spark.util.ByteBufferInputStream
     
     /**
    - *  A [[org.apache.spark.broadcast.Broadcast]] implementation that uses a BitTorrent-like
    - *  protocol to do a distributed transfer of the broadcasted data to the executors.
    - *  The mechanism is as follows. The driver divides the serializes the broadcasted data,
    - *  divides it into smaller chunks, and stores them in the BlockManager of the driver.
    - *  These chunks are reported to the BlockManagerMaster so that all the executors can
    - *  learn the location of those chunks. The first time the broadcast variable (sent as
    - *  part of task) is deserialized at a executor, all the chunks are fetched using
    - *  the BlockManager. When all the chunks are fetched (initially from the driver's
    - *  BlockManager), they are combined and deserialized to recreate the broadcasted data.
    - *  However, the chunks are also stored in the BlockManager and reported to the
    - *  BlockManagerMaster. As more executors fetch the chunks, BlockManagerMaster learns
    - *  multiple locations for each chunk. Hence, subsequent fetches of each chunk will be
    - *  made to other executors who already have those chunks, resulting in a distributed
    - *  fetching. This prevents the driver from being the bottleneck in sending out multiple
    - *  copies of the broadcast data (one per executor) as done by the
    - *  [[org.apache.spark.broadcast.HttpBroadcast]].
    + * A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]].
    + *
    + * The mechanism is as follows:
    + *
    + * The driver divides the serialized object into small chunks and
    + * stores those chunks in the BlockManager of the driver.
    + *
    + * On each executor, the executor first attempts to fetch the object from its BlockManager. If
    + * it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or
    + * other executors if available. Once it gets the chunks, it puts the chunks in its own
    + * BlockManager, ready for other executors to fetch from.
    + *
    + * This prevents the driver from being the bottleneck in sending out multiple copies of the
    + * broadcast data (one per executor) as done by the [[org.apache.spark.broadcast.HttpBroadcast]].
    + *
    + * @param obj object to broadcast
    + * @param isLocal whether Spark is running in local mode (single JVM process).
    + * @param id A unique identifier for the broadcast variable.
      */
     private[spark] class TorrentBroadcast[T: ClassTag](
    -    @transient var value_ : T, isLocal: Boolean, id: Long)
    +    obj : T,
    +    @transient private val isLocal: Boolean,
    +    id: Long)
       extends Broadcast[T](id) with Logging with Serializable {
     
    -  override protected def getValue() = value_
    +  /**
    +   * Value of the broadcast object. On driver, this is set directly by the constructor.
    +   * On executors, this is reconstructed by [[readObject]], which builds this value by reading
    +   * blocks from the driver and/or other executors.
    +   */
    +  @transient private var _value: T = obj
    --- End diff --
    
    rename this ? prefix underscore is going to cause confusion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2030#discussion_r16432354
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -18,50 +18,116 @@
     package org.apache.spark.broadcast
     
     import java.io._
    +import java.nio.ByteBuffer
     
    +import scala.collection.JavaConversions.asJavaEnumeration
     import scala.reflect.ClassTag
     import scala.util.Random
     
     import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
     import org.apache.spark.io.CompressionCodec
     import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
    +import org.apache.spark.util.ByteBufferInputStream
     
     /**
    - *  A [[org.apache.spark.broadcast.Broadcast]] implementation that uses a BitTorrent-like
    - *  protocol to do a distributed transfer of the broadcasted data to the executors.
    - *  The mechanism is as follows. The driver divides the serializes the broadcasted data,
    - *  divides it into smaller chunks, and stores them in the BlockManager of the driver.
    - *  These chunks are reported to the BlockManagerMaster so that all the executors can
    - *  learn the location of those chunks. The first time the broadcast variable (sent as
    - *  part of task) is deserialized at a executor, all the chunks are fetched using
    - *  the BlockManager. When all the chunks are fetched (initially from the driver's
    - *  BlockManager), they are combined and deserialized to recreate the broadcasted data.
    - *  However, the chunks are also stored in the BlockManager and reported to the
    - *  BlockManagerMaster. As more executors fetch the chunks, BlockManagerMaster learns
    - *  multiple locations for each chunk. Hence, subsequent fetches of each chunk will be
    - *  made to other executors who already have those chunks, resulting in a distributed
    - *  fetching. This prevents the driver from being the bottleneck in sending out multiple
    - *  copies of the broadcast data (one per executor) as done by the
    - *  [[org.apache.spark.broadcast.HttpBroadcast]].
    + * A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]].
    + *
    + * The mechanism is as follows:
    + *
    + * The driver divides the serialized object into small chunks and
    + * stores those chunks in the BlockManager of the driver.
    + *
    + * On each executor, the executor first attempts to fetch the object from its BlockManager. If
    + * it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or
    + * other executors if available. Once it gets the chunks, it puts the chunks in its own
    + * BlockManager, ready for other executors to fetch from.
    + *
    + * This prevents the driver from being the bottleneck in sending out multiple copies of the
    + * broadcast data (one per executor) as done by the [[org.apache.spark.broadcast.HttpBroadcast]].
    + *
    + * @param obj object to broadcast
    + * @param isLocal whether Spark is running in local mode (single JVM process).
    + * @param id A unique identifier for the broadcast variable.
      */
     private[spark] class TorrentBroadcast[T: ClassTag](
    -    @transient var value_ : T, isLocal: Boolean, id: Long)
    +    obj : T,
    +    @transient private val isLocal: Boolean,
    +    id: Long)
       extends Broadcast[T](id) with Logging with Serializable {
     
    -  override protected def getValue() = value_
    +  /**
    +   * Value of the broadcast object. On driver, this is set directly by the constructor.
    +   * On executors, this is reconstructed by [[readObject]], which builds this value by reading
    +   * blocks from the driver and/or other executors.
    +   */
    +  @transient private var _value: T = obj
    --- End diff --
    
    what's the confusion? seems common to use prefix underscore to indicate private variable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52670386
  
    Thanks for looking into this. If you can't find a difference after verifying the results, we should probably hold this change for 1.2.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52601987
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18819/consoleFull) for   PR 2030 at commit [`3670f00`](https://github.com/apache/spark/commit/3670f002248faabf6d1b613204bb23c10454d9e6).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2030#discussion_r16434924
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -109,99 +155,30 @@ private[spark] class TorrentBroadcast[T: ClassTag](
       private def readObject(in: ObjectInputStream) {
         in.defaultReadObject()
         TorrentBroadcast.synchronized {
    --- End diff --
    
    that can be pretty bad for large blocks ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52596109
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18816/consoleFull) for   PR 2030 at commit [`c1185cd`](https://github.com/apache/spark/commit/c1185cded8224470f06abb4e345df5ebbd87db68).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2030#discussion_r16433411
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -18,50 +18,116 @@
     package org.apache.spark.broadcast
     
     import java.io._
    +import java.nio.ByteBuffer
     
    +import scala.collection.JavaConversions.asJavaEnumeration
     import scala.reflect.ClassTag
     import scala.util.Random
     
     import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
     import org.apache.spark.io.CompressionCodec
     import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
    +import org.apache.spark.util.ByteBufferInputStream
     
     /**
    - *  A [[org.apache.spark.broadcast.Broadcast]] implementation that uses a BitTorrent-like
    - *  protocol to do a distributed transfer of the broadcasted data to the executors.
    - *  The mechanism is as follows. The driver divides the serializes the broadcasted data,
    - *  divides it into smaller chunks, and stores them in the BlockManager of the driver.
    - *  These chunks are reported to the BlockManagerMaster so that all the executors can
    - *  learn the location of those chunks. The first time the broadcast variable (sent as
    - *  part of task) is deserialized at a executor, all the chunks are fetched using
    - *  the BlockManager. When all the chunks are fetched (initially from the driver's
    - *  BlockManager), they are combined and deserialized to recreate the broadcasted data.
    - *  However, the chunks are also stored in the BlockManager and reported to the
    - *  BlockManagerMaster. As more executors fetch the chunks, BlockManagerMaster learns
    - *  multiple locations for each chunk. Hence, subsequent fetches of each chunk will be
    - *  made to other executors who already have those chunks, resulting in a distributed
    - *  fetching. This prevents the driver from being the bottleneck in sending out multiple
    - *  copies of the broadcast data (one per executor) as done by the
    - *  [[org.apache.spark.broadcast.HttpBroadcast]].
    + * A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]].
    + *
    + * The mechanism is as follows:
    + *
    + * The driver divides the serialized object into small chunks and
    + * stores those chunks in the BlockManager of the driver.
    + *
    + * On each executor, the executor first attempts to fetch the object from its BlockManager. If
    + * it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or
    + * other executors if available. Once it gets the chunks, it puts the chunks in its own
    + * BlockManager, ready for other executors to fetch from.
    + *
    + * This prevents the driver from being the bottleneck in sending out multiple copies of the
    + * broadcast data (one per executor) as done by the [[org.apache.spark.broadcast.HttpBroadcast]].
    --- End diff --
    
    To add, the latency of torrent broadcast is "sort of" twice http (2 requests instead of 1) assuming single piece for both : which is why I am curious about the bottleneck.
    Currently we are running our jobs with http explicitly since our tasks are low latency low duration tasks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by shivaram <gi...@git.apache.org>.
Github user shivaram commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52666365
  
    I think just this patch vs. before #2028 vs. 1.0.2 should be fine. I just wanted to make sure the performance regression is minimal due to broadcast -- so as long as we are close or better than 1.0.2 numbers it should be good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52715181
  
    Here are some updated results that show performance for a wider range of closure sizes (all in bytes):
    
    ![image](https://cloud.githubusercontent.com/assets/50748/3974720/3b83d1d6-27f9-11e4-8771-a151a7871772.png)
    
    ![image](https://cloud.githubusercontent.com/assets/50748/3974721/46455ebe-27f9-11e4-9de2-2a006eee0793.png)
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by shivaram <gi...@git.apache.org>.
Github user shivaram commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2030#discussion_r16399782
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -109,99 +137,30 @@ private[spark] class TorrentBroadcast[T: ClassTag](
       private def readObject(in: ObjectInputStream) {
         in.defaultReadObject()
         TorrentBroadcast.synchronized {
    -      SparkEnv.get.blockManager.getSingle(broadcastId) match {
    +      SparkEnv.get.blockManager.getLocal(broadcastId).map(_.data.next()) match {
             case Some(x) =>
    -          value_ = x.asInstanceOf[T]
    +          _value = x.asInstanceOf[T]
     
             case None =>
    -          val start = System.nanoTime
               logInfo("Started reading broadcast variable " + id)
    -
    -          // Initialize @transient variables that will receive garbage values from the master.
    -          resetWorkerVariables()
    -
    -          if (receiveBroadcast()) {
    -            value_ = TorrentBroadcast.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks)
    -
    -            /* Store the merged copy in cache so that the next worker doesn't need to rebuild it.
    -             * This creates a trade-off between memory usage and latency. Storing copy doubles
    -             * the memory footprint; not storing doubles deserialization cost. Also,
    -             * this does not need to be reported to BlockManagerMaster since other executors
    -             * does not need to access this block (they only need to fetch the chunks,
    -             * which are reported).
    -             */
    -            SparkEnv.get.blockManager.putSingle(
    -              broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
    -
    -            // Remove arrayOfBlocks from memory once value_ is on local cache
    -            resetWorkerVariables()
    -          } else {
    -            logError("Reading broadcast variable " + id + " failed")
    -          }
    -
    -          val time = (System.nanoTime - start) / 1e9
    +          val start = System.nanoTime()
    +          val blocks = readBlocks()
    +          val time = (System.nanoTime() - start) / 1e9
               logInfo("Reading broadcast variable " + id + " took " + time + " s")
    -      }
    -    }
    -  }
    -
    -  private def resetWorkerVariables() {
    -    arrayOfBlocks = null
    -    totalBytes = -1
    -    totalBlocks = -1
    -    hasBlocks = 0
    -  }
    -
    -  private def receiveBroadcast(): Boolean = {
    -    // Receive meta-info about the size of broadcast data,
    -    // the number of chunks it is divided into, etc.
    -    val metaId = BroadcastBlockId(id, "meta")
    -    var attemptId = 10
    -    while (attemptId > 0 && totalBlocks == -1) {
    -      SparkEnv.get.blockManager.getSingle(metaId) match {
    -        case Some(x) =>
    -          val tInfo = x.asInstanceOf[TorrentInfo]
    -          totalBlocks = tInfo.totalBlocks
    -          totalBytes = tInfo.totalBytes
    -          arrayOfBlocks = new Array[TorrentBlock](totalBlocks)
    -          hasBlocks = 0
    -
    -        case None =>
    -          Thread.sleep(500)
    -      }
    -      attemptId -= 1
    -    }
    -
    -    if (totalBlocks == -1) {
    -      return false
    -    }
     
    -    /*
    -     * Fetch actual chunks of data. Note that all these chunks are stored in
    -     * the BlockManager and reported to the master, so that other executors
    -     * can find out and pull the chunks from this executor.
    -     */
    -    val recvOrder = new Random().shuffle(Array.iterate(0, totalBlocks)(_ + 1).toList)
    -    for (pid <- recvOrder) {
    -      val pieceId = BroadcastBlockId(id, "piece" + pid)
    -      SparkEnv.get.blockManager.getSingle(pieceId) match {
    -        case Some(x) =>
    -          arrayOfBlocks(pid) = x.asInstanceOf[TorrentBlock]
    -          hasBlocks += 1
    +          _value = TorrentBroadcast.unBlockifyObject[T](blocks)
    +          // Store the merged copy in BlockManager so other tasks on this executor doesn't
    --- End diff --
    
    nit: doesn't -> don't


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2030#discussion_r16400670
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -27,41 +29,87 @@ import org.apache.spark.io.CompressionCodec
     import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
     
     /**
    - *  A [[org.apache.spark.broadcast.Broadcast]] implementation that uses a BitTorrent-like
    - *  protocol to do a distributed transfer of the broadcasted data to the executors.
    - *  The mechanism is as follows. The driver divides the serializes the broadcasted data,
    - *  divides it into smaller chunks, and stores them in the BlockManager of the driver.
    - *  These chunks are reported to the BlockManagerMaster so that all the executors can
    - *  learn the location of those chunks. The first time the broadcast variable (sent as
    - *  part of task) is deserialized at a executor, all the chunks are fetched using
    - *  the BlockManager. When all the chunks are fetched (initially from the driver's
    - *  BlockManager), they are combined and deserialized to recreate the broadcasted data.
    - *  However, the chunks are also stored in the BlockManager and reported to the
    - *  BlockManagerMaster. As more executors fetch the chunks, BlockManagerMaster learns
    - *  multiple locations for each chunk. Hence, subsequent fetches of each chunk will be
    - *  made to other executors who already have those chunks, resulting in a distributed
    - *  fetching. This prevents the driver from being the bottleneck in sending out multiple
    - *  copies of the broadcast data (one per executor) as done by the
    - *  [[org.apache.spark.broadcast.HttpBroadcast]].
    + * A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]].
    + *
    + * The mechanism is as follows:
    + *
    + * The driver divides the serialized object into small chunks and
    + * stores those chunks in the BlockManager of the driver.
    + *
    + * On each executor, the executor first attempts to fetch the object from its BlockManager. If
    + * it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or
    + * other executors if available. Once it gets the chunks, it puts the chunks in its own
    + * BlockManager, ready for other executors to fetch from.
    + *
    + * This prevents the driver from being the bottleneck in sending out multiple copies of the
    + * broadcast data (one per executor) as done by the [[org.apache.spark.broadcast.HttpBroadcast]].
    + *
    + * @param obj object to broadcast
    + * @param isLocal whether Spark is running in local mode (single JVM process).
    + * @param id A unique identifier for the broadcast variable.
      */
     private[spark] class TorrentBroadcast[T: ClassTag](
    -    @transient var value_ : T, isLocal: Boolean, id: Long)
    +    obj : T,
    +    @transient private val isLocal: Boolean,
    +    id: Long)
       extends Broadcast[T](id) with Logging with Serializable {
     
    -  override protected def getValue() = value_
    +  /**
    +   * Value of the broadcast object. On driver, this is set directly by the constructor.
    +   * On executors, this is reconstructed by [[readObject]], which builds this value by reading
    +   * blocks from the driver and/or other executors.
    +   */
    +  @transient private var _value: T = obj
    +
    +  /** Total number of blocks this broadcast variable contains. */
    +  private val numBlocks: Int = writeBlocks()
     
       private val broadcastId = BroadcastBlockId(id)
     
    -  SparkEnv.get.blockManager.putSingle(
    -    broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
    +  override protected def getValue() = _value
     
    -  @transient private var arrayOfBlocks: Array[TorrentBlock] = null
    -  @transient private var totalBlocks = -1
    -  @transient private var totalBytes = -1
    -  @transient private var hasBlocks = 0
    +  /**
    +   * Divide the object into multiple blocks and put those blocks in the block manager.
    +   *
    +   * @return number of blocks this broadcast variable is divided into
    +   */
    +  private def writeBlocks(): Int = {
    +    val blocks = TorrentBroadcast.blockifyObject(_value)
    +    blocks.zipWithIndex.foreach { case (block, i) =>
    +      // TODO: Use putBytes directly.
    +      SparkEnv.get.blockManager.putSingle(
    +        BroadcastBlockId(id, "piece" + i),
    +        block,
    +        StorageLevel.MEMORY_AND_DISK_SER,
    +        tellMaster = true)
    +    }
    +    blocks.length
    +  }
     
    -  if (!isLocal) {
    -    sendBroadcast()
    +  /** Fetch torrent blocks from the driver and/or other executors. */
    +  private def readBlocks(): Array[Array[Byte]] = {
    +    // Fetch chunks of data. Note that all these chunks are stored in the BlockManager and reported
    +    // to the driver, so other executors can pull these chunks from this executor as well.
    +    var numBlocksAvailable = 0
    +    val blocks = new Array[Array[Byte]](numBlocks)
    +
    +    for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
    +      val pieceId = BroadcastBlockId(id, "piece" + pid)
    +      SparkEnv.get.blockManager.getSingle(pieceId) match {
    +        case Some(x) =>
    +          blocks(pid) = x.asInstanceOf[Array[Byte]]
    +          numBlocksAvailable += 1
    +          SparkEnv.get.blockManager.putBytes(
    --- End diff --
    
    note to self - this is a bug


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2030#discussion_r16432179
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -18,50 +18,116 @@
     package org.apache.spark.broadcast
     
     import java.io._
    +import java.nio.ByteBuffer
     
    +import scala.collection.JavaConversions.asJavaEnumeration
     import scala.reflect.ClassTag
     import scala.util.Random
     
     import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
     import org.apache.spark.io.CompressionCodec
     import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
    +import org.apache.spark.util.ByteBufferInputStream
     
     /**
    - *  A [[org.apache.spark.broadcast.Broadcast]] implementation that uses a BitTorrent-like
    - *  protocol to do a distributed transfer of the broadcasted data to the executors.
    - *  The mechanism is as follows. The driver divides the serializes the broadcasted data,
    - *  divides it into smaller chunks, and stores them in the BlockManager of the driver.
    - *  These chunks are reported to the BlockManagerMaster so that all the executors can
    - *  learn the location of those chunks. The first time the broadcast variable (sent as
    - *  part of task) is deserialized at a executor, all the chunks are fetched using
    - *  the BlockManager. When all the chunks are fetched (initially from the driver's
    - *  BlockManager), they are combined and deserialized to recreate the broadcasted data.
    - *  However, the chunks are also stored in the BlockManager and reported to the
    - *  BlockManagerMaster. As more executors fetch the chunks, BlockManagerMaster learns
    - *  multiple locations for each chunk. Hence, subsequent fetches of each chunk will be
    - *  made to other executors who already have those chunks, resulting in a distributed
    - *  fetching. This prevents the driver from being the bottleneck in sending out multiple
    - *  copies of the broadcast data (one per executor) as done by the
    - *  [[org.apache.spark.broadcast.HttpBroadcast]].
    + * A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]].
    + *
    + * The mechanism is as follows:
    + *
    + * The driver divides the serialized object into small chunks and
    + * stores those chunks in the BlockManager of the driver.
    + *
    + * On each executor, the executor first attempts to fetch the object from its BlockManager. If
    + * it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or
    + * other executors if available. Once it gets the chunks, it puts the chunks in its own
    + * BlockManager, ready for other executors to fetch from.
    + *
    + * This prevents the driver from being the bottleneck in sending out multiple copies of the
    + * broadcast data (one per executor) as done by the [[org.apache.spark.broadcast.HttpBroadcast]].
    + *
    + * @param obj object to broadcast
    + * @param isLocal whether Spark is running in local mode (single JVM process).
    + * @param id A unique identifier for the broadcast variable.
      */
     private[spark] class TorrentBroadcast[T: ClassTag](
    -    @transient var value_ : T, isLocal: Boolean, id: Long)
    +    obj : T,
    +    @transient private val isLocal: Boolean,
    +    id: Long)
       extends Broadcast[T](id) with Logging with Serializable {
     
    -  override protected def getValue() = value_
    +  /**
    +   * Value of the broadcast object. On driver, this is set directly by the constructor.
    +   * On executors, this is reconstructed by [[readObject]], which builds this value by reading
    +   * blocks from the driver and/or other executors.
    +   */
    +  @transient private var _value: T = obj
    --- End diff --
    
    or is there some other reason for it ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2030#discussion_r16432043
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -18,50 +18,116 @@
     package org.apache.spark.broadcast
     
     import java.io._
    +import java.nio.ByteBuffer
     
    +import scala.collection.JavaConversions.asJavaEnumeration
     import scala.reflect.ClassTag
     import scala.util.Random
     
     import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
     import org.apache.spark.io.CompressionCodec
     import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
    +import org.apache.spark.util.ByteBufferInputStream
     
     /**
    - *  A [[org.apache.spark.broadcast.Broadcast]] implementation that uses a BitTorrent-like
    - *  protocol to do a distributed transfer of the broadcasted data to the executors.
    - *  The mechanism is as follows. The driver divides the serializes the broadcasted data,
    - *  divides it into smaller chunks, and stores them in the BlockManager of the driver.
    - *  These chunks are reported to the BlockManagerMaster so that all the executors can
    - *  learn the location of those chunks. The first time the broadcast variable (sent as
    - *  part of task) is deserialized at a executor, all the chunks are fetched using
    - *  the BlockManager. When all the chunks are fetched (initially from the driver's
    - *  BlockManager), they are combined and deserialized to recreate the broadcasted data.
    - *  However, the chunks are also stored in the BlockManager and reported to the
    - *  BlockManagerMaster. As more executors fetch the chunks, BlockManagerMaster learns
    - *  multiple locations for each chunk. Hence, subsequent fetches of each chunk will be
    - *  made to other executors who already have those chunks, resulting in a distributed
    - *  fetching. This prevents the driver from being the bottleneck in sending out multiple
    - *  copies of the broadcast data (one per executor) as done by the
    - *  [[org.apache.spark.broadcast.HttpBroadcast]].
    + * A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]].
    + *
    + * The mechanism is as follows:
    + *
    + * The driver divides the serialized object into small chunks and
    + * stores those chunks in the BlockManager of the driver.
    + *
    + * On each executor, the executor first attempts to fetch the object from its BlockManager. If
    + * it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or
    + * other executors if available. Once it gets the chunks, it puts the chunks in its own
    + * BlockManager, ready for other executors to fetch from.
    + *
    + * This prevents the driver from being the bottleneck in sending out multiple copies of the
    + * broadcast data (one per executor) as done by the [[org.apache.spark.broadcast.HttpBroadcast]].
    + *
    + * @param obj object to broadcast
    + * @param isLocal whether Spark is running in local mode (single JVM process).
    + * @param id A unique identifier for the broadcast variable.
      */
     private[spark] class TorrentBroadcast[T: ClassTag](
    -    @transient var value_ : T, isLocal: Boolean, id: Long)
    +    obj : T,
    --- End diff --
    
    obj is not a field here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by shivaram <gi...@git.apache.org>.
Github user shivaram commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52598348
  
    @rxin -- Nice work in reducing this to 2 RPCs. The patch looks good in terms of maintaining the same functionality as before.  I'll wait for the Snappy fix and for Jenkins and then take another look.
    Also it'll be cool to have results from spark-perf (Thanks @JoshRosen !). 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52673575
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18857/consoleFull) for   PR 2030 at commit [`5bacb9d`](https://github.com/apache/spark/commit/5bacb9dbfab4ae5c83eb1874bd6fd6ae87ff4ad6).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2030#discussion_r16434945
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -109,99 +155,30 @@ private[spark] class TorrentBroadcast[T: ClassTag](
       private def readObject(in: ObjectInputStream) {
         in.defaultReadObject()
         TorrentBroadcast.synchronized {
    --- End diff --
    
    anyway in the future we should do finer grained locking.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52600292
  
    Ok I pushed a new version that should've addressed all the comments and fixed the bug.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by mosharaf <gi...@git.apache.org>.
Github user mosharaf commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52689216
  
    Looks good to me. 
    
    Re: one of the earlier comments about broadcasting small objects through TorrentBroadcast. A not-so-intrusive way would be to piggyback data on top of the first control message, when data is small. Then small blocks will be like http, but bigger ones will becomes Torrent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52695504
  
    [SPARK-3115](https://issues.apache.org/jira/browse/SPARK-3115) is an umbrella issue to improve task broadcast latency for small tasks; Reynold has created a bunch of subtasks to track future improvements, including the multi-threading improvements.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52691643
  
    That's a great idea. @JoshRosen and I were just talking about this before lunch!



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3119] Re-implementation of TorrentBroad...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/2030#issuecomment-52663723
  
    Testing again to make sure tests pass two times in a row.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org