You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Mosharaf Chowdhury <mo...@gmail.com> on 2014/01/22 09:58:49 UTC

Re: TorrentBroadcast + persist = bug

Milos, thanks for reporting. It seems just initializing the
TorrentBroadcast is causing the trouble (I don't see you calling it at
all).

I'll try to make some time to reproduce it tomorrow and let you know.



--
Mosharaf Chowdhury
http://www.mosharaf.com/


On Wed, Jan 22, 2014 at 12:22 AM, Milos Nikolic
<mi...@gmail.com>wrote:

> Anyone to confirm this?
>
> On Jan 20, 2014, at 12:22 PM, Milos Nikolic <mi...@gmail.com>
> wrote:
>
> > Hello,
> >
> > I think there is a bug with TorrentBroadcast in the latest release
> (0.8.1). The problem is that even a simple job (e.g., rdd.count) hangs
> waiting for some tasks to finish. Here is how to reproduce the problem:
> >
> > 1) Configure Spark such that node X is the master and also one of the
> workers (e.g., 5 nodes => 5 workers and 1 master)
> > 2) Activate TorrentBroadcast
> > 3) Use Kryo serializer (the problem happens more often than with Java
> serializer)
> > 4) Read some file from HDFS, persist RDD, and call count
> >
> > In almost 80% of the cases (~50% with Java serializer), the count job
> hangs waiting for two tasks from node X to finish. The problem *does not*
> appear if: 1) I separate the master from the worker nodes, or 2) I use
> HttpBroadcast, or 3) I do not persist the RDD.
> >
> > The code is below.
> >
> >  def main(args: Array[String]): Unit = {
> >
> >    System.setProperty("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
> >    System.setProperty("spark.kryo.registrator", "test.MyRegistrator")
> >    System.setProperty("spark.broadcast.factory",
> "org.apache.spark.broadcast.TorrentBroadcastFactory")
> >
> >    val sc = new SparkContext(...)
> >
> >    val file = "hdfs://server:9000/user/xxx/Test.out"  // ~750MB
> >    val rdd = sc.textFile(file)
> >    rdd.persist
> >    println("Counting: " + rdd.count)
> >  }
> >
> >
> > Best regards,
> > Milos
>
>

Re: TorrentBroadcast + persist = bug

Posted by Milos Nikolic <mi...@gmail.com>.
Here they are...


On Jan 22, 2014, at 10:10 AM, Mosharaf Chowdhury <mo...@gmail.com> wrote:

> Btw, can you please send me the master and worker logs?
> 
> --
> Mosharaf Chowdhury
> http://www.mosharaf.com/
> 
> 
> On Wed, Jan 22, 2014 at 12:58 AM, Mosharaf Chowdhury <mo...@gmail.com> wrote:
> Milos, thanks for reporting. It seems just initializing the TorrentBroadcast is causing the trouble (I don't see you calling it at all). 
> 
> I'll try to make some time to reproduce it tomorrow and let you know.
> 
> 
> 
> --
> Mosharaf Chowdhury
> http://www.mosharaf.com/
> 
> 
> On Wed, Jan 22, 2014 at 12:22 AM, Milos Nikolic <mi...@gmail.com> wrote:
> Anyone to confirm this?
> 
> On Jan 20, 2014, at 12:22 PM, Milos Nikolic <mi...@gmail.com> wrote:
> 
> > Hello,
> >
> > I think there is a bug with TorrentBroadcast in the latest release (0.8.1). The problem is that even a simple job (e.g., rdd.count) hangs waiting for some tasks to finish. Here is how to reproduce the problem:
> >
> > 1) Configure Spark such that node X is the master and also one of the workers (e.g., 5 nodes => 5 workers and 1 master)
> > 2) Activate TorrentBroadcast
> > 3) Use Kryo serializer (the problem happens more often than with Java serializer)
> > 4) Read some file from HDFS, persist RDD, and call count
> >
> > In almost 80% of the cases (~50% with Java serializer), the count job hangs waiting for two tasks from node X to finish. The problem *does not* appear if: 1) I separate the master from the worker nodes, or 2) I use HttpBroadcast, or 3) I do not persist the RDD.
> >
> > The code is below.
> >
> >  def main(args: Array[String]): Unit = {
> >
> >    System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
> >    System.setProperty("spark.kryo.registrator", "test.MyRegistrator")
> >    System.setProperty("spark.broadcast.factory", "org.apache.spark.broadcast.TorrentBroadcastFactory")
> >
> >    val sc = new SparkContext(...)
> >
> >    val file = "hdfs://server:9000/user/xxx/Test.out"  // ~750MB
> >    val rdd = sc.textFile(file)
> >    rdd.persist
> >    println("Counting: " + rdd.count)
> >  }
> >
> >
> > Best regards,
> > Milos
> 
> 
> 


Re: TorrentBroadcast + persist = bug

Posted by Mosharaf Chowdhury <mo...@gmail.com>.
Btw, can you please send me the master and worker logs?

--
Mosharaf Chowdhury
http://www.mosharaf.com/


On Wed, Jan 22, 2014 at 12:58 AM, Mosharaf Chowdhury <
mosharafkabir@gmail.com> wrote:

> Milos, thanks for reporting. It seems just initializing the
> TorrentBroadcast is causing the trouble (I don't see you calling it at
> all).
>
> I'll try to make some time to reproduce it tomorrow and let you know.
>
>
>
> --
> Mosharaf Chowdhury
> http://www.mosharaf.com/
>
>
> On Wed, Jan 22, 2014 at 12:22 AM, Milos Nikolic <milos.nikolic83@gmail.com
> > wrote:
>
>> Anyone to confirm this?
>>
>> On Jan 20, 2014, at 12:22 PM, Milos Nikolic <mi...@gmail.com>
>> wrote:
>>
>> > Hello,
>> >
>> > I think there is a bug with TorrentBroadcast in the latest release
>> (0.8.1). The problem is that even a simple job (e.g., rdd.count) hangs
>> waiting for some tasks to finish. Here is how to reproduce the problem:
>> >
>> > 1) Configure Spark such that node X is the master and also one of the
>> workers (e.g., 5 nodes => 5 workers and 1 master)
>> > 2) Activate TorrentBroadcast
>> > 3) Use Kryo serializer (the problem happens more often than with Java
>> serializer)
>> > 4) Read some file from HDFS, persist RDD, and call count
>> >
>> > In almost 80% of the cases (~50% with Java serializer), the count job
>> hangs waiting for two tasks from node X to finish. The problem *does not*
>> appear if: 1) I separate the master from the worker nodes, or 2) I use
>> HttpBroadcast, or 3) I do not persist the RDD.
>> >
>> > The code is below.
>> >
>> >  def main(args: Array[String]): Unit = {
>> >
>> >    System.setProperty("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer")
>> >    System.setProperty("spark.kryo.registrator", "test.MyRegistrator")
>> >    System.setProperty("spark.broadcast.factory",
>> "org.apache.spark.broadcast.TorrentBroadcastFactory")
>> >
>> >    val sc = new SparkContext(...)
>> >
>> >    val file = "hdfs://server:9000/user/xxx/Test.out"  // ~750MB
>> >    val rdd = sc.textFile(file)
>> >    rdd.persist
>> >    println("Counting: " + rdd.count)
>> >  }
>> >
>> >
>> > Best regards,
>> > Milos
>>
>>
>