You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Patrick <ti...@gmail.com> on 2017/02/23 14:21:10 UTC

Shuffling on Dataframe to RDD conversion with a map transformation

Hi,

I was wondering why there is two stages shuffle write and shuffle read at
line no 194,
when i am converting Dataframe obtained by sql query to RDD which is
causing job not to abort and it doesnt scale on TBs of data.

Also, i have given shuffle fraction=0.6 and memory fraction=0.2 while
executing the job.

 val queryresult = hiveCtx.sql("select
 Name,Age,Product1,Product2,Product3,count(*), max(Month)  from test GROUP
BY Name,Age,Product1,Product2,Product3 GROUPING SETS
((Name,Age,Product1),(Name,Age,Product2),(Name,Age,Product3))")

line 194:

val resultrdd = queryresult.map( x => (x.get(0),
(x.get(1),x.get(2),x.get(3),x.get(4),x.get(5)))



[image: Inline image 2]

Any insights into the problem would be very helpful.

Thanks

Re: Shuffling on Dataframe to RDD conversion with a map transformation

Posted by Patrick <ti...@gmail.com>.
Hi,

In the above query GroupBy used was creating a shuffle stage and by default
spark sql uses 200 partition for the shuffle stage.
This can be configured by spark.sql.shuffle.partitions.

By increasing the number of shuffle partitions, i was able to run this.

Thanks,


On Thu, Feb 23, 2017 at 9:15 PM, Yong Zhang <ja...@hotmail.com> wrote:

> It will be helpful if you print the execution plan for your query here.
>
>
> Yong
>
>
> ------------------------------
> *From:* Patrick <ti...@gmail.com>
> *Sent:* Thursday, February 23, 2017 9:21 AM
> *To:* user
> *Subject:* Shuffling on Dataframe to RDD conversion with a map
> transformation
>
> Hi,
>
> I was wondering why there is two stages shuffle write and shuffle read at
> line no 194,
> when i am converting Dataframe obtained by sql query to RDD which is
> causing job not to abort and it doesnt scale on TBs of data.
>
> Also, i have given shuffle fraction=0.6 and memory fraction=0.2 while
> executing the job.
>
>  val queryresult = hiveCtx.sql("select  Name,Age,Product1,Product2,Product3,count(*),
> max(Month)  from test GROUP BY Name,Age,Product1,Product2,Product3
> GROUPING SETS ((Name,Age,Product1),(Name,Age,Product2),(Name,Age,
> Product3))")
>
> line 194:
>
> val resultrdd = queryresult.map( x => (x.get(0),
> (x.get(1),x.get(2),x.get(3),x.get(4),x.get(5)))
>
>
>
> [image: Inline image 2]
>
> Any insights into the problem would be very helpful.
>
> Thanks
>
>

Re: Shuffling on Dataframe to RDD conversion with a map transformation

Posted by Yong Zhang <ja...@hotmail.com>.
It will be helpful if you print the execution plan for your query here.


Yong


________________________________
From: Patrick <ti...@gmail.com>
Sent: Thursday, February 23, 2017 9:21 AM
To: user
Subject: Shuffling on Dataframe to RDD conversion with a map transformation

Hi,

I was wondering why there is two stages shuffle write and shuffle read at line no 194,
when i am converting Dataframe obtained by sql query to RDD which is causing job not to abort and it doesnt scale on TBs of data.

Also, i have given shuffle fraction=0.6 and memory fraction=0.2 while executing the job.

 val queryresult = hiveCtx.sql("select  Name,Age,Product1,Product2,Product3,count(*), max(Month)  from test GROUP BY Name,Age,Product1,Product2,Product3 GROUPING SETS ((Name,Age,Product1),(Name,Age,Product2),(Name,Age,Product3))")

line 194:

val resultrdd = queryresult.map( x => (x.get(0), (x.get(1),x.get(2),x.get(3),x.get(4),x.get(5)))



[Inline image 2]

Any insights into the problem would be very helpful.

Thanks