You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Hemminger Jeff <je...@atware.co.jp> on 2015/08/28 12:39:27 UTC

Alternative to Large Broadcast Variables

Hi,

I am working on a Spark application that is using of a large (~3G)
broadcast variable as a lookup table. The application refines the data in
this lookup table in an iterative manner. So this large variable is
broadcast many times during the lifetime of the application process.

>From what I have observed perhaps 60% of the execution time is spent
waiting for the variable to broadcast in each iteration. My reading of a
Spark performance article[1] suggests that the time spent broadcasting will
increase with the number of nodes I add.

My question for the group - what would you suggest as an alternative to
broadcasting a large variable like this?

One approach I have considered is segmenting my RDD and adding a copy of
the lookup table for each X number of values to process. So, for example,
if I have a list of 1 million entries to process (eg, RDD[Entry]), I could
split this into segments of 100K entries, with a copy of the lookup table,
and make that an RDD[(Lookup, Array[Entry]).

Another solution I am looking at it is making the lookup table an RDD
instead of a broadcast variable. Perhaps I could use an IndexedRDD[2] to
improve performance. One issue with this approach is that I would have to
rewrite my application code to use two RDDs so that I do not reference the
lookup RDD in the from within the closure of another RDD.

Any other recommendations?

Jeff


[1]
http://www.cs.berkeley.edu/~agearh/cs267.sp10/files/mosharaf-spark-bc-report-spring10.pdf

[2]https://github.com/amplab/spark-indexedrdd

Re: Alternative to Large Broadcast Variables

Posted by Hemminger Jeff <je...@atware.co.jp>.
Thanks for the recommendations. I had been focused on solving the problem
"within Spark" but a distributed database sounds like a better solution.

Jeff

On Sat, Aug 29, 2015 at 11:47 PM, Ted Yu <yu...@gmail.com> wrote:

> Not sure if the race condition you mentioned is related to Cassandra's
> data consistency model.
>
> If hbase is used as the external key value store, atomicity is guaranteed.
>
> Cheers
>
> On Sat, Aug 29, 2015 at 7:40 AM, Raghavendra Pandey <
> raghavendra.pandey@gmail.com> wrote:
>
>> We are using Cassandra for similar kind of problem and it works well...
>> You need to take care of race condition between updating the store and
>> looking up the store...
>> On Aug 29, 2015 1:31 AM, "Ted Yu" <yu...@gmail.com> wrote:
>>
>>> +1 on Jason's suggestion.
>>>
>>> bq. this large variable is broadcast many times during the lifetime
>>>
>>> Please consider making this large variable more granular. Meaning,
>>> reduce the amount of data transferred between the key value store and
>>> your app during update.
>>>
>>> Cheers
>>>
>>> On Fri, Aug 28, 2015 at 12:44 PM, Jason <Ja...@jasonknight.us> wrote:
>>>
>>>> You could try using an external key value store (like HBase, Redis) and
>>>> perform lookups/updates inside of your mappers (you'd need to create the
>>>> connection within a mapPartitions code block to avoid the connection
>>>> setup/teardown overhead)?
>>>>
>>>> I haven't done this myself though, so I'm just throwing the idea out
>>>> there.
>>>>
>>>> On Fri, Aug 28, 2015 at 3:39 AM Hemminger Jeff <je...@atware.co.jp>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I am working on a Spark application that is using of a large (~3G)
>>>>> broadcast variable as a lookup table. The application refines the data in
>>>>> this lookup table in an iterative manner. So this large variable is
>>>>> broadcast many times during the lifetime of the application process.
>>>>>
>>>>> From what I have observed perhaps 60% of the execution time is spent
>>>>> waiting for the variable to broadcast in each iteration. My reading of a
>>>>> Spark performance article[1] suggests that the time spent broadcasting will
>>>>> increase with the number of nodes I add.
>>>>>
>>>>> My question for the group - what would you suggest as an alternative
>>>>> to broadcasting a large variable like this?
>>>>>
>>>>> One approach I have considered is segmenting my RDD and adding a copy
>>>>> of the lookup table for each X number of values to process. So, for
>>>>> example, if I have a list of 1 million entries to process (eg, RDD[Entry]),
>>>>> I could split this into segments of 100K entries, with a copy of the lookup
>>>>> table, and make that an RDD[(Lookup, Array[Entry]).
>>>>>
>>>>> Another solution I am looking at it is making the lookup table an RDD
>>>>> instead of a broadcast variable. Perhaps I could use an IndexedRDD[2] to
>>>>> improve performance. One issue with this approach is that I would have to
>>>>> rewrite my application code to use two RDDs so that I do not reference the
>>>>> lookup RDD in the from within the closure of another RDD.
>>>>>
>>>>> Any other recommendations?
>>>>>
>>>>> Jeff
>>>>>
>>>>>
>>>>> [1]
>>>>> http://www.cs.berkeley.edu/~agearh/cs267.sp10/files/mosharaf-spark-bc-report-spring10.pdf
>>>>>
>>>>> [2]https://github.com/amplab/spark-indexedrdd
>>>>>
>>>>
>>>
>

Re: Alternative to Large Broadcast Variables

Posted by Ted Yu <yu...@gmail.com>.
Not sure if the race condition you mentioned is related to Cassandra's data
consistency model.

If hbase is used as the external key value store, atomicity is guaranteed.

Cheers

On Sat, Aug 29, 2015 at 7:40 AM, Raghavendra Pandey <
raghavendra.pandey@gmail.com> wrote:

> We are using Cassandra for similar kind of problem and it works well...
> You need to take care of race condition between updating the store and
> looking up the store...
> On Aug 29, 2015 1:31 AM, "Ted Yu" <yu...@gmail.com> wrote:
>
>> +1 on Jason's suggestion.
>>
>> bq. this large variable is broadcast many times during the lifetime
>>
>> Please consider making this large variable more granular. Meaning, reduce
>> the amount of data transferred between the key value store and your app
>> during update.
>>
>> Cheers
>>
>> On Fri, Aug 28, 2015 at 12:44 PM, Jason <Ja...@jasonknight.us> wrote:
>>
>>> You could try using an external key value store (like HBase, Redis) and
>>> perform lookups/updates inside of your mappers (you'd need to create the
>>> connection within a mapPartitions code block to avoid the connection
>>> setup/teardown overhead)?
>>>
>>> I haven't done this myself though, so I'm just throwing the idea out
>>> there.
>>>
>>> On Fri, Aug 28, 2015 at 3:39 AM Hemminger Jeff <je...@atware.co.jp>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am working on a Spark application that is using of a large (~3G)
>>>> broadcast variable as a lookup table. The application refines the data in
>>>> this lookup table in an iterative manner. So this large variable is
>>>> broadcast many times during the lifetime of the application process.
>>>>
>>>> From what I have observed perhaps 60% of the execution time is spent
>>>> waiting for the variable to broadcast in each iteration. My reading of a
>>>> Spark performance article[1] suggests that the time spent broadcasting will
>>>> increase with the number of nodes I add.
>>>>
>>>> My question for the group - what would you suggest as an alternative to
>>>> broadcasting a large variable like this?
>>>>
>>>> One approach I have considered is segmenting my RDD and adding a copy
>>>> of the lookup table for each X number of values to process. So, for
>>>> example, if I have a list of 1 million entries to process (eg, RDD[Entry]),
>>>> I could split this into segments of 100K entries, with a copy of the lookup
>>>> table, and make that an RDD[(Lookup, Array[Entry]).
>>>>
>>>> Another solution I am looking at it is making the lookup table an RDD
>>>> instead of a broadcast variable. Perhaps I could use an IndexedRDD[2] to
>>>> improve performance. One issue with this approach is that I would have to
>>>> rewrite my application code to use two RDDs so that I do not reference the
>>>> lookup RDD in the from within the closure of another RDD.
>>>>
>>>> Any other recommendations?
>>>>
>>>> Jeff
>>>>
>>>>
>>>> [1]
>>>> http://www.cs.berkeley.edu/~agearh/cs267.sp10/files/mosharaf-spark-bc-report-spring10.pdf
>>>>
>>>> [2]https://github.com/amplab/spark-indexedrdd
>>>>
>>>
>>

Re: Alternative to Large Broadcast Variables

Posted by Raghavendra Pandey <ra...@gmail.com>.
We are using Cassandra for similar kind of problem and it works well... You
need to take care of race condition between updating the store and looking
up the store...
On Aug 29, 2015 1:31 AM, "Ted Yu" <yu...@gmail.com> wrote:

> +1 on Jason's suggestion.
>
> bq. this large variable is broadcast many times during the lifetime
>
> Please consider making this large variable more granular. Meaning, reduce
> the amount of data transferred between the key value store and your app
> during update.
>
> Cheers
>
> On Fri, Aug 28, 2015 at 12:44 PM, Jason <Ja...@jasonknight.us> wrote:
>
>> You could try using an external key value store (like HBase, Redis) and
>> perform lookups/updates inside of your mappers (you'd need to create the
>> connection within a mapPartitions code block to avoid the connection
>> setup/teardown overhead)?
>>
>> I haven't done this myself though, so I'm just throwing the idea out
>> there.
>>
>> On Fri, Aug 28, 2015 at 3:39 AM Hemminger Jeff <je...@atware.co.jp> wrote:
>>
>>> Hi,
>>>
>>> I am working on a Spark application that is using of a large (~3G)
>>> broadcast variable as a lookup table. The application refines the data in
>>> this lookup table in an iterative manner. So this large variable is
>>> broadcast many times during the lifetime of the application process.
>>>
>>> From what I have observed perhaps 60% of the execution time is spent
>>> waiting for the variable to broadcast in each iteration. My reading of a
>>> Spark performance article[1] suggests that the time spent broadcasting will
>>> increase with the number of nodes I add.
>>>
>>> My question for the group - what would you suggest as an alternative to
>>> broadcasting a large variable like this?
>>>
>>> One approach I have considered is segmenting my RDD and adding a copy of
>>> the lookup table for each X number of values to process. So, for example,
>>> if I have a list of 1 million entries to process (eg, RDD[Entry]), I could
>>> split this into segments of 100K entries, with a copy of the lookup table,
>>> and make that an RDD[(Lookup, Array[Entry]).
>>>
>>> Another solution I am looking at it is making the lookup table an RDD
>>> instead of a broadcast variable. Perhaps I could use an IndexedRDD[2] to
>>> improve performance. One issue with this approach is that I would have to
>>> rewrite my application code to use two RDDs so that I do not reference the
>>> lookup RDD in the from within the closure of another RDD.
>>>
>>> Any other recommendations?
>>>
>>> Jeff
>>>
>>>
>>> [1]
>>> http://www.cs.berkeley.edu/~agearh/cs267.sp10/files/mosharaf-spark-bc-report-spring10.pdf
>>>
>>> [2]https://github.com/amplab/spark-indexedrdd
>>>
>>
>

Re: Alternative to Large Broadcast Variables

Posted by Ted Yu <yu...@gmail.com>.
+1 on Jason's suggestion.

bq. this large variable is broadcast many times during the lifetime

Please consider making this large variable more granular. Meaning, reduce
the amount of data transferred between the key value store and your app
during update.

Cheers

On Fri, Aug 28, 2015 at 12:44 PM, Jason <Ja...@jasonknight.us> wrote:

> You could try using an external key value store (like HBase, Redis) and
> perform lookups/updates inside of your mappers (you'd need to create the
> connection within a mapPartitions code block to avoid the connection
> setup/teardown overhead)?
>
> I haven't done this myself though, so I'm just throwing the idea out there.
>
> On Fri, Aug 28, 2015 at 3:39 AM Hemminger Jeff <je...@atware.co.jp> wrote:
>
>> Hi,
>>
>> I am working on a Spark application that is using of a large (~3G)
>> broadcast variable as a lookup table. The application refines the data in
>> this lookup table in an iterative manner. So this large variable is
>> broadcast many times during the lifetime of the application process.
>>
>> From what I have observed perhaps 60% of the execution time is spent
>> waiting for the variable to broadcast in each iteration. My reading of a
>> Spark performance article[1] suggests that the time spent broadcasting will
>> increase with the number of nodes I add.
>>
>> My question for the group - what would you suggest as an alternative to
>> broadcasting a large variable like this?
>>
>> One approach I have considered is segmenting my RDD and adding a copy of
>> the lookup table for each X number of values to process. So, for example,
>> if I have a list of 1 million entries to process (eg, RDD[Entry]), I could
>> split this into segments of 100K entries, with a copy of the lookup table,
>> and make that an RDD[(Lookup, Array[Entry]).
>>
>> Another solution I am looking at it is making the lookup table an RDD
>> instead of a broadcast variable. Perhaps I could use an IndexedRDD[2] to
>> improve performance. One issue with this approach is that I would have to
>> rewrite my application code to use two RDDs so that I do not reference the
>> lookup RDD in the from within the closure of another RDD.
>>
>> Any other recommendations?
>>
>> Jeff
>>
>>
>> [1]
>> http://www.cs.berkeley.edu/~agearh/cs267.sp10/files/mosharaf-spark-bc-report-spring10.pdf
>>
>> [2]https://github.com/amplab/spark-indexedrdd
>>
>

Re: Alternative to Large Broadcast Variables

Posted by Jason <Ja...@jasonknight.us>.
You could try using an external key value store (like HBase, Redis) and
perform lookups/updates inside of your mappers (you'd need to create the
connection within a mapPartitions code block to avoid the connection
setup/teardown overhead)?

I haven't done this myself though, so I'm just throwing the idea out there.

On Fri, Aug 28, 2015 at 3:39 AM Hemminger Jeff <je...@atware.co.jp> wrote:

> Hi,
>
> I am working on a Spark application that is using of a large (~3G)
> broadcast variable as a lookup table. The application refines the data in
> this lookup table in an iterative manner. So this large variable is
> broadcast many times during the lifetime of the application process.
>
> From what I have observed perhaps 60% of the execution time is spent
> waiting for the variable to broadcast in each iteration. My reading of a
> Spark performance article[1] suggests that the time spent broadcasting will
> increase with the number of nodes I add.
>
> My question for the group - what would you suggest as an alternative to
> broadcasting a large variable like this?
>
> One approach I have considered is segmenting my RDD and adding a copy of
> the lookup table for each X number of values to process. So, for example,
> if I have a list of 1 million entries to process (eg, RDD[Entry]), I could
> split this into segments of 100K entries, with a copy of the lookup table,
> and make that an RDD[(Lookup, Array[Entry]).
>
> Another solution I am looking at it is making the lookup table an RDD
> instead of a broadcast variable. Perhaps I could use an IndexedRDD[2] to
> improve performance. One issue with this approach is that I would have to
> rewrite my application code to use two RDDs so that I do not reference the
> lookup RDD in the from within the closure of another RDD.
>
> Any other recommendations?
>
> Jeff
>
>
> [1]
> http://www.cs.berkeley.edu/~agearh/cs267.sp10/files/mosharaf-spark-bc-report-spring10.pdf
>
> [2]https://github.com/amplab/spark-indexedrdd
>