You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Benyi Wang <be...@gmail.com> on 2014/11/04 21:23:46 UTC

Best practice for join

I need to join RDD[A], RDD[B], and RDD[C]. Here is what I did,

# build (K,V) from A and B to prepare the join

val ja = A.map( r => (K1, Va))
val jb = B.map( r => (K1, Vb))

# join A, B

val jab = ja.join(jb)

# build (K,V) from the joined result of A and B to prepare joining with C

val jc = C.map(r => (K2, Vc))
jab.join(jc).map( => (K,V) ).reduceByKey(_ + _)

Because A may have multiple fields, so Va is a tuple with more than 2
fields. It is said that scala Tuple may not be specialized, and there is
boxing/unboxing issue, so I tried to use "case class" for Va, Vb, and Vc,
K2 and K which are compound keys, and V is a pair of count and ratio, _+_
will create a new ratio. I register those case classes in Kryo.

The sizes of Shuffle read/write look smaller. But I found GC overhead is
really high: GC Time is about 20~30% of duration for the reduceByKey task.
I think a lot of new objects are created using case classes during
map/reduce.

How to make the thing better?

Re: Best practice for join

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
Oh, in that case, if you want to reduce the GC time, you can specify the
level of parallelism along with your join, reduceByKey operations.

Thanks
Best Regards

On Wed, Nov 5, 2014 at 1:11 PM, Benyi Wang <be...@gmail.com> wrote:

> I'm using spark-1.0.0 in CDH 5.1.0. The big problem is SparkSQL doesn't
> support Hash join in this version.
>
> On Tue, Nov 4, 2014 at 10:54 PM, Akhil Das <ak...@sigmoidanalytics.com>
> wrote:
>
>> How about Using SparkSQL <https://spark.apache.org/sql/>?
>>
>> Thanks
>> Best Regards
>>
>> On Wed, Nov 5, 2014 at 1:53 AM, Benyi Wang <be...@gmail.com> wrote:
>>
>>> I need to join RDD[A], RDD[B], and RDD[C]. Here is what I did,
>>>
>>> # build (K,V) from A and B to prepare the join
>>>
>>> val ja = A.map( r => (K1, Va))
>>> val jb = B.map( r => (K1, Vb))
>>>
>>> # join A, B
>>>
>>> val jab = ja.join(jb)
>>>
>>> # build (K,V) from the joined result of A and B to prepare joining with C
>>>
>>> val jc = C.map(r => (K2, Vc))
>>> jab.join(jc).map( => (K,V) ).reduceByKey(_ + _)
>>>
>>> Because A may have multiple fields, so Va is a tuple with more than 2
>>> fields. It is said that scala Tuple may not be specialized, and there is
>>> boxing/unboxing issue, so I tried to use "case class" for Va, Vb, and Vc,
>>> K2 and K which are compound keys, and V is a pair of count and ratio, _+_
>>> will create a new ratio. I register those case classes in Kryo.
>>>
>>> The sizes of Shuffle read/write look smaller. But I found GC overhead is
>>> really high: GC Time is about 20~30% of duration for the reduceByKey task.
>>> I think a lot of new objects are created using case classes during
>>> map/reduce.
>>>
>>> How to make the thing better?
>>>
>>
>>
>

Re: Best practice for join

Posted by Benyi Wang <be...@gmail.com>.
I'm using spark-1.0.0 in CDH 5.1.0. The big problem is SparkSQL doesn't
support Hash join in this version.

On Tue, Nov 4, 2014 at 10:54 PM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> How about Using SparkSQL <https://spark.apache.org/sql/>?
>
> Thanks
> Best Regards
>
> On Wed, Nov 5, 2014 at 1:53 AM, Benyi Wang <be...@gmail.com> wrote:
>
>> I need to join RDD[A], RDD[B], and RDD[C]. Here is what I did,
>>
>> # build (K,V) from A and B to prepare the join
>>
>> val ja = A.map( r => (K1, Va))
>> val jb = B.map( r => (K1, Vb))
>>
>> # join A, B
>>
>> val jab = ja.join(jb)
>>
>> # build (K,V) from the joined result of A and B to prepare joining with C
>>
>> val jc = C.map(r => (K2, Vc))
>> jab.join(jc).map( => (K,V) ).reduceByKey(_ + _)
>>
>> Because A may have multiple fields, so Va is a tuple with more than 2
>> fields. It is said that scala Tuple may not be specialized, and there is
>> boxing/unboxing issue, so I tried to use "case class" for Va, Vb, and Vc,
>> K2 and K which are compound keys, and V is a pair of count and ratio, _+_
>> will create a new ratio. I register those case classes in Kryo.
>>
>> The sizes of Shuffle read/write look smaller. But I found GC overhead is
>> really high: GC Time is about 20~30% of duration for the reduceByKey task.
>> I think a lot of new objects are created using case classes during
>> map/reduce.
>>
>> How to make the thing better?
>>
>
>

Re: Best practice for join

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
How about Using SparkSQL <https://spark.apache.org/sql/>?

Thanks
Best Regards

On Wed, Nov 5, 2014 at 1:53 AM, Benyi Wang <be...@gmail.com> wrote:

> I need to join RDD[A], RDD[B], and RDD[C]. Here is what I did,
>
> # build (K,V) from A and B to prepare the join
>
> val ja = A.map( r => (K1, Va))
> val jb = B.map( r => (K1, Vb))
>
> # join A, B
>
> val jab = ja.join(jb)
>
> # build (K,V) from the joined result of A and B to prepare joining with C
>
> val jc = C.map(r => (K2, Vc))
> jab.join(jc).map( => (K,V) ).reduceByKey(_ + _)
>
> Because A may have multiple fields, so Va is a tuple with more than 2
> fields. It is said that scala Tuple may not be specialized, and there is
> boxing/unboxing issue, so I tried to use "case class" for Va, Vb, and Vc,
> K2 and K which are compound keys, and V is a pair of count and ratio, _+_
> will create a new ratio. I register those case classes in Kryo.
>
> The sizes of Shuffle read/write look smaller. But I found GC overhead is
> really high: GC Time is about 20~30% of duration for the reduceByKey task.
> I think a lot of new objects are created using case classes during
> map/reduce.
>
> How to make the thing better?
>