You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by David Diebold <da...@gmail.com> on 2022/04/11 15:46:49 UTC

Question about bucketing and custom partitioners

Hello,

I have a few questions related to bucketing and custom partitioning in
dataframe api.

I am considering bucketing to perform one-side free shuffle join in
incremental jobs, but there is one thing that I'm not happy with.
Data is likely to grow/skew over time. At some point, i would need to
change amount of buckets which would provoke shuffle.

Instead of this, I would like to use a custom partitioner, that would
replace shuffle by narrow transformation.
That is something that was feasible with RDD developer api. For example, I
could use such partitioning scheme:
partition_id = (nb_partitions-1) * ( hash(column) - Int.minValue) /
(Int.maxValue - Int.minValue)
When I multiply amount of partitions by 2 each new partition depends only
on one partition from parent (=> narrow transformation)

So, here are my questions :

1/ Is it possible to use custom partitioner when saving a dataframe with
bucketing ?
2/ Still with the API dataframe, is it possible to apply custom partitioner
to a dataframe ?
    Is it possible to repartition the dataframe with a narrow
transformation like what could be done with RDD ?
    Is there some sort of dataframe developer API ? Do you have any
pointers on this ?

Thanks !
David

Re: Question about bucketing and custom partitioners

Posted by Gourav Sengupta <go...@gmail.com>.
Hi,
have you checked skew settings in SPARK 3.2?

I am also not quite sure why you need a custom partitioner? While RDD still
remains a valid option you must try to explore the recent ways of thinking
and framing better solutions using SPARK.

Regards,
Gourav Sengupta

On Mon, Apr 11, 2022 at 4:47 PM David Diebold <da...@gmail.com>
wrote:

> Hello,
>
> I have a few questions related to bucketing and custom partitioning in
> dataframe api.
>
> I am considering bucketing to perform one-side free shuffle join in
> incremental jobs, but there is one thing that I'm not happy with.
> Data is likely to grow/skew over time. At some point, i would need to
> change amount of buckets which would provoke shuffle.
>
> Instead of this, I would like to use a custom partitioner, that would
> replace shuffle by narrow transformation.
> That is something that was feasible with RDD developer api. For example, I
> could use such partitioning scheme:
> partition_id = (nb_partitions-1) * ( hash(column) - Int.minValue) /
> (Int.maxValue - Int.minValue)
> When I multiply amount of partitions by 2 each new partition depends only
> on one partition from parent (=> narrow transformation)
>
> So, here are my questions :
>
> 1/ Is it possible to use custom partitioner when saving a dataframe with
> bucketing ?
> 2/ Still with the API dataframe, is it possible to apply custom
> partitioner to a dataframe ?
>     Is it possible to repartition the dataframe with a narrow
> transformation like what could be done with RDD ?
>     Is there some sort of dataframe developer API ? Do you have any
> pointers on this ?
>
> Thanks !
> David
>

Re: Question about bucketing and custom partitioners

Posted by ayan guha <gu...@gmail.com>.
IMHO you should ask this to dev email for better response and suggestions

On Tue, 12 Apr 2022 at 1:47 am, David Diebold <da...@gmail.com>
wrote:

> Hello,
>
> I have a few questions related to bucketing and custom partitioning in
> dataframe api.
>
> I am considering bucketing to perform one-side free shuffle join in
> incremental jobs, but there is one thing that I'm not happy with.
> Data is likely to grow/skew over time. At some point, i would need to
> change amount of buckets which would provoke shuffle.
>
> Instead of this, I would like to use a custom partitioner, that would
> replace shuffle by narrow transformation.
> That is something that was feasible with RDD developer api. For example, I
> could use such partitioning scheme:
> partition_id = (nb_partitions-1) * ( hash(column) - Int.minValue) /
> (Int.maxValue - Int.minValue)
> When I multiply amount of partitions by 2 each new partition depends only
> on one partition from parent (=> narrow transformation)
>
> So, here are my questions :
>
> 1/ Is it possible to use custom partitioner when saving a dataframe with
> bucketing ?
> 2/ Still with the API dataframe, is it possible to apply custom
> partitioner to a dataframe ?
>     Is it possible to repartition the dataframe with a narrow
> transformation like what could be done with RDD ?
>     Is there some sort of dataframe developer API ? Do you have any
> pointers on this ?
>
> Thanks !
>
> David
>
-- 
Best Regards,
Ayan Guha

Re: Question about bucketing and custom partitioners

Posted by "Lalwani, Jayesh" <jl...@amazon.com.INVALID>.
You can partition and bucket a Dataframe by any column. You can create a column using an expression. You can add a paritition_id column to your dataframe, and partition/bucket by that column

From: David Diebold <da...@gmail.com>
Date: Monday, April 11, 2022 at 11:48 AM
To: "user @spark" <us...@spark.apache.org>
Subject: [EXTERNAL] Question about bucketing and custom partitioners


CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.


Hello,

I have a few questions related to bucketing and custom partitioning in dataframe api.

I am considering bucketing to perform one-side free shuffle join in incremental jobs, but there is one thing that I'm not happy with.
Data is likely to grow/skew over time. At some point, i would need to change amount of buckets which would provoke shuffle.

Instead of this, I would like to use a custom partitioner, that would replace shuffle by narrow transformation.
That is something that was feasible with RDD developer api. For example, I could use such partitioning scheme:
partition_id = (nb_partitions-1) * ( hash(column) - Int.minValue) / (Int.maxValue - Int.minValue)
When I multiply amount of partitions by 2 each new partition depends only on one partition from parent (=> narrow transformation)

So, here are my questions :

1/ Is it possible to use custom partitioner when saving a dataframe with bucketing ?
2/ Still with the API dataframe, is it possible to apply custom partitioner to a dataframe ?
    Is it possible to repartition the dataframe with a narrow transformation like what could be done with RDD ?
    Is there some sort of dataframe developer API ? Do you have any pointers on this ?

Thanks !
David