You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by AlexG <sw...@gmail.com> on 2015/08/05 20:54:50 UTC

spark hangs at broadcasting during a filter

I'm trying to load a 1 Tb file whose lines i,j,v represent the values of a
matrix given as A_{ij} = v so I can convert it to a Parquet file. Only some
of the rows of A are relevant, so the following code first loads the
triplets are text, splits them into Tuple3[Int, Int, Double], drops triplets
whose rows should be skipped, then forms a Tuple2[Int, List[Tuple2[Int,
Double]]] for each row (if I'm judging datatypes correctly).

val valsrows = sc.textFile(valsinpath).map(_.split(",")).
                          map(x => (x(1).toInt, (x(0).toInt,
x(2).toDouble))).
                          filter(x => !droprows.contains(x._1)).
                          groupByKey.
                          map(x => (x._1, x._2.toSeq.sortBy(_._1)))

Spark hangs during a broadcast that occurs during the filter step (according
to the Spark UI). The last two lines in the log before it pauses are:

5/08/05 18:50:10 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in
memory on 172.31.49.149:37643 (size: 4.6 KB, free: 113.8 GB)
15/08/05 18:50:10 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in
memory on 172.31.49.159:41846 (size: 4.6 KB, free: 113.8 GB)

I've left Spark running for up to 17 minutes one time, and it never
continues past this point. I'm using a cluster of 30 r3.8xlarge EC2
instances (244Gb, 32 cores) with spark in standalone mode with 220G executor
and driver memory, and using the kyroserializer.

Any ideas on what could be causing this hang?



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-hangs-at-broadcasting-during-a-filter-tp24143.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: spark hangs at broadcasting during a filter

Posted by Alex Gittens <sw...@gmail.com>.
Thanks. Repartitioning to a smaller number of partitions seems to fix my
issue, but I'll keep broadcasting in mind (droprows is an integer array
with about 4 million entries).

On Wed, Aug 5, 2015 at 12:34 PM, Philip Weaver <ph...@gmail.com>
wrote:

> How big is droprows?
>
> Try explicitly broadcasting it like this:
>
> val broadcastDropRows = sc.broadcast(dropRows)
>
> val valsrows = ...
>     .filter(x => !broadcastDropRows.value.contains(x._1))
>
> - Philip
>
>
> On Wed, Aug 5, 2015 at 11:54 AM, AlexG <sw...@gmail.com> wrote:
>
>> I'm trying to load a 1 Tb file whose lines i,j,v represent the values of a
>> matrix given as A_{ij} = v so I can convert it to a Parquet file. Only
>> some
>> of the rows of A are relevant, so the following code first loads the
>> triplets are text, splits them into Tuple3[Int, Int, Double], drops
>> triplets
>> whose rows should be skipped, then forms a Tuple2[Int, List[Tuple2[Int,
>> Double]]] for each row (if I'm judging datatypes correctly).
>>
>> val valsrows = sc.textFile(valsinpath).map(_.split(",")).
>>                           map(x => (x(1).toInt, (x(0).toInt,
>> x(2).toDouble))).
>>                           filter(x => !droprows.contains(x._1)).
>>                           groupByKey.
>>                           map(x => (x._1, x._2.toSeq.sortBy(_._1)))
>>
>> Spark hangs during a broadcast that occurs during the filter step
>> (according
>> to the Spark UI). The last two lines in the log before it pauses are:
>>
>> 5/08/05 18:50:10 INFO storage.BlockManagerInfo: Added broadcast_3_piece0
>> in
>> memory on 172.31.49.149:37643 (size: 4.6 KB, free: 113.8 GB)
>> 15/08/05 18:50:10 INFO storage.BlockManagerInfo: Added broadcast_3_piece0
>> in
>> memory on 172.31.49.159:41846 (size: 4.6 KB, free: 113.8 GB)
>>
>> I've left Spark running for up to 17 minutes one time, and it never
>> continues past this point. I'm using a cluster of 30 r3.8xlarge EC2
>> instances (244Gb, 32 cores) with spark in standalone mode with 220G
>> executor
>> and driver memory, and using the kyroserializer.
>>
>> Any ideas on what could be causing this hang?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/spark-hangs-at-broadcasting-during-a-filter-tp24143.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>

Re: spark hangs at broadcasting during a filter

Posted by Philip Weaver <ph...@gmail.com>.
How big is droprows?

Try explicitly broadcasting it like this:

val broadcastDropRows = sc.broadcast(dropRows)

val valsrows = ...
    .filter(x => !broadcastDropRows.value.contains(x._1))

- Philip


On Wed, Aug 5, 2015 at 11:54 AM, AlexG <sw...@gmail.com> wrote:

> I'm trying to load a 1 Tb file whose lines i,j,v represent the values of a
> matrix given as A_{ij} = v so I can convert it to a Parquet file. Only some
> of the rows of A are relevant, so the following code first loads the
> triplets are text, splits them into Tuple3[Int, Int, Double], drops
> triplets
> whose rows should be skipped, then forms a Tuple2[Int, List[Tuple2[Int,
> Double]]] for each row (if I'm judging datatypes correctly).
>
> val valsrows = sc.textFile(valsinpath).map(_.split(",")).
>                           map(x => (x(1).toInt, (x(0).toInt,
> x(2).toDouble))).
>                           filter(x => !droprows.contains(x._1)).
>                           groupByKey.
>                           map(x => (x._1, x._2.toSeq.sortBy(_._1)))
>
> Spark hangs during a broadcast that occurs during the filter step
> (according
> to the Spark UI). The last two lines in the log before it pauses are:
>
> 5/08/05 18:50:10 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in
> memory on 172.31.49.149:37643 (size: 4.6 KB, free: 113.8 GB)
> 15/08/05 18:50:10 INFO storage.BlockManagerInfo: Added broadcast_3_piece0
> in
> memory on 172.31.49.159:41846 (size: 4.6 KB, free: 113.8 GB)
>
> I've left Spark running for up to 17 minutes one time, and it never
> continues past this point. I'm using a cluster of 30 r3.8xlarge EC2
> instances (244Gb, 32 cores) with spark in standalone mode with 220G
> executor
> and driver memory, and using the kyroserializer.
>
> Any ideas on what could be causing this hang?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/spark-hangs-at-broadcasting-during-a-filter-tp24143.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>