You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Cesar Flores <ce...@gmail.com> on 2015/10/14 17:45:11 UTC

Question about data frame partitioning in Spark 1.3.0

My current version of spark is 1.3.0 and my question is the next:

I have large data frames where the main field is an user id. I need to do
many group by's and joins using that field. Do the performance will
increase if before doing any group by or join operation I first convert to
rdd to partition by the user id? In other words trying something like the
next lines in all my user data tables will improve the performance in the
long run?:

val partitioned_rdd = unpartitioned_df
   .map(row=>(row.getLong(0), row))
   .partitionBy(new HashPartitioner(200))
   .map(x => x._2)

val partitioned_df = hc.createDataFrame(partitioned_rdd,
unpartitioned_df.schema)




Thanks a lot
-- 
Cesar Flores

Re: Question about data frame partitioning in Spark 1.3.0

Posted by Michael Armbrust <mi...@databricks.com>.
Caching the partitioned_df  <- this one, but you have to do the
partitioning using something like sql("SELECT * FROM ... CLUSTER BY a") as
there is no such operation exposed on dataframes.

2) Here is the JIRA: https://issues.apache.org/jira/browse/SPARK-5354

Re: Question about data frame partitioning in Spark 1.3.0

Posted by Cesar Flores <ce...@gmail.com>.
Thanks Michael for your input.


By 1) do you mean:

   - Caching the partitioned_rdd
   - Caching the partitioned_df
   - *Or* just caching unpartitioned_df without the need of creating
the partitioned_rdd
   variable?


Can you expand a little bit more 2)


Thanks!

On Wed, Oct 14, 2015 at 12:11 PM, Michael Armbrust <mi...@databricks.com>
wrote:

> This won't help as for two reasons:
>  1) Its all still just creating lineage since you aren't caching the
> partitioned data.  It will still fetch the shuffled blocks for each query.
>  2) The query optimizer is not aware of RDD level partitioning since its
> mostly a blackbox.
>
> 1) could be fixed by adding caching.  2) is on our roadmap (though you'd
> have to use logical DataFrame expressions to do the partitioning instead of
> a class based partitioner).
>
> On Wed, Oct 14, 2015 at 8:45 AM, Cesar Flores <ce...@gmail.com> wrote:
>
>>
>> My current version of spark is 1.3.0 and my question is the next:
>>
>> I have large data frames where the main field is an user id. I need to do
>> many group by's and joins using that field. Do the performance will
>> increase if before doing any group by or join operation I first convert to
>> rdd to partition by the user id? In other words trying something like the
>> next lines in all my user data tables will improve the performance in the
>> long run?:
>>
>> val partitioned_rdd = unpartitioned_df
>>    .map(row=>(row.getLong(0), row))
>>    .partitionBy(new HashPartitioner(200))
>>    .map(x => x._2)
>>
>> val partitioned_df = hc.createDataFrame(partitioned_rdd,
>> unpartitioned_df.schema)
>>
>>
>>
>>
>> Thanks a lot
>> --
>> Cesar Flores
>>
>
>


-- 
Cesar Flores

Re: Question about data frame partitioning in Spark 1.3.0

Posted by Michael Armbrust <mi...@databricks.com>.
This won't help as for two reasons:
 1) Its all still just creating lineage since you aren't caching the
partitioned data.  It will still fetch the shuffled blocks for each query.
 2) The query optimizer is not aware of RDD level partitioning since its
mostly a blackbox.

1) could be fixed by adding caching.  2) is on our roadmap (though you'd
have to use logical DataFrame expressions to do the partitioning instead of
a class based partitioner).

On Wed, Oct 14, 2015 at 8:45 AM, Cesar Flores <ce...@gmail.com> wrote:

>
> My current version of spark is 1.3.0 and my question is the next:
>
> I have large data frames where the main field is an user id. I need to do
> many group by's and joins using that field. Do the performance will
> increase if before doing any group by or join operation I first convert to
> rdd to partition by the user id? In other words trying something like the
> next lines in all my user data tables will improve the performance in the
> long run?:
>
> val partitioned_rdd = unpartitioned_df
>    .map(row=>(row.getLong(0), row))
>    .partitionBy(new HashPartitioner(200))
>    .map(x => x._2)
>
> val partitioned_df = hc.createDataFrame(partitioned_rdd,
> unpartitioned_df.schema)
>
>
>
>
> Thanks a lot
> --
> Cesar Flores
>