You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Shivam Verma <ra...@gmail.com> on 2022/12/14 05:25:03 UTC

Check if shuffle is caused for repartitioned pyspark dataframes

Hello folks,

I have a use case where I save two pyspark dataframes as parquet files and
then use them later to join with each other or with other tables and
perform multiple aggregations.

Since I know the column being used in the downstream joins and groupby, I
was hoping I could use co-partitioning for the two dataframes when saving
them and avoid shuffle later.

I repartitioned the two dataframes (providing same number of partitions and
same column for repartitioning).

While I'm seeing an *improvement in execution time* with the above
approach, how do I confirm that a shuffle is actually NOT happening (maybe
through SparkUI)?
The spark plan and shuffle read/write are the same in the two scenarios:
1. Using repartitioned dataframes to perform join+aggregation
2. Using base dataframes itself (without explicit repartitioning) to
perform join+aggregatio

I have a StackOverflow post with more details regarding the same:
https://stackoverflow.com/q/74771971/14741697

Thanks in advance, appreciate your help!

Regards,
Shivam

Re: EXT: Re: Check if shuffle is caused for repartitioned pyspark dataframes

Posted by Vibhor Gupta <Vi...@walmart.com.INVALID>.
Hi Shivam,

I think what you are looking for is bucket optimization. The execution engine (spark) knows how the data was shuffled before persisting it.
Unfortunately this is not supported when you use vanilla parquet files.
Try saving the dataframe using the saveAsTable<https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameWriter.saveAsTable.html> api along with the bucektBy<https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameWriter.bucketBy.html> option.
I think the new table formats also support bucketing.

You can also go through this<https://blog.clairvoyantsoft.com/bucketing-in-spark-878d2e02140f> medium article that describes a similar problem as yours.

Regards,
Vibhor Gupta
[https://miro.medium.com/max/1200/1*q4xHBk9ksw20Vf_25OYCtA.jpeg]<https://blog.clairvoyantsoft.com/bucketing-in-spark-878d2e02140f>
Bucketing in Spark. Spark job optimization using Bucketing | by Pawan Singh Negi | Clairvoyant Blog - Medium<https://blog.clairvoyantsoft.com/bucketing-in-spark-878d2e02140f>
Advantages of Bucketing the Tables in Spark. Optimized tables/Datasets.; Optimized Joins when you use pre-shuffled bucketed tables/Datasets.; Enables more efficient queries when you have predicates defined on a bucketed column.; Optimized access to the table data.You will minimize the table scan for the given query when using the WHERE condition on the bucketing column.
blog.clairvoyantsoft.com

pyspark.sql.DataFrameWriter.bucketBy — PySpark 3.1.2 documentation - Apache Spark<https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameWriter.bucketBy.html>
Parameters numBuckets int. the number of buckets to save. col str, list or tuple. a name of a column, or a list of names. cols str. additional names (optional). If col is a list it should be empty.. Notes. Applicable for file-based data sources in combination with DataFrameWriter.saveAsTable(). Examples
spark.apache.org





pyspark.sql.DataFrameWriter.saveAsTable — PySpark 3.1.2 documentation - Apache Spark<https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameWriter.saveAsTable.html>
pyspark.sql.DataFrameWriter.saveAsTable¶ DataFrameWriter.saveAsTable (name, format = None, mode = None, partitionBy = None, ** options) [source] ¶ Saves the content of the DataFrame as the specified table.. In the case the table already exists, behavior of this function depends on the save mode, specified by the mode function (default to throwing an exception). ). When mode is Overwrite, the ...
spark.apache.org





________________________________
From: Shivam Verma <ra...@gmail.com>
Sent: Monday, December 26, 2022 8:08 PM
To: Russell Jurney <ru...@gmail.com>
Cc: Gurunandan <gu...@gmail.com>; user@spark.apache.org <us...@spark.apache.org>
Subject: EXT: Re: Check if shuffle is caused for repartitioned pyspark dataframes

EXTERNAL: Report suspicious emails to Email Abuse.

I tried sorting the repartitioned dataframes on the partition key before saving them as parquet files, however, when I read those repartitioned-sorted dataframes
and join them on the partition key, the spark plan still shows `Exchange hashpartitioning` step, which I want to avoid:

== Physical Plan ==
*(5) HashAggregate(keys=[id#15373L], functions=[sum(col_20#15393), sum(col_40#15413), max(col_60#15433)])
+- *(5) HashAggregate(keys=[id#15373L], functions=[partial_sum(col_20#15393), partial_sum(col_40#15413), partial_max(col_60#15433)])
   +- *(5) Project [id#15373L, col_20#15393, col_40#15413, col_60#15433]
      +- *(5) SortMergeJoin [id#15373L], [id#15171L], Inner
         :- *(2) Sort [id#15373L ASC NULLS FIRST], false, 0
         :  +- Exchange hashpartitioning(id#15373L, 4) <========================= Want to avoid this
         :     +- *(1) Project [id#15373L, col_20#15393, col_40#15413, col_60#15433]
         :        +- *(1) Filter isnotnull(id#15373L)
         :           +- *(1) FileScan parquet [id#15373L,col_20#15393,col_40#15413,col_60#15433] Batched: true, Format: Parquet, Location: InMemoryFileIndex[s3a://performance/co_partition_test/repartitioned_df2..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint,col_20:double,col_40:double,col_60:double>
         +- *(4) Sort [id#15171L ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(id#15171L, 4) <========================= Want to avoid this
               +- *(3) Project [id#15171L]
                  +- *(3) Filter isnotnull(id#15171L)
                     +- *(3) FileScan parquet [id#15171L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[s3a://performance/co_partition_test/repartitioned_df1..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>

How do I ensure that this Exchange hashpartitioning step is skipped, or is the exchange hashpartitioning part of the SparkPlan but won't actually do any repartitioning and hence no overhead would be involved?

On Fri, Dec 23, 2022 at 10:08 PM Russell Jurney <ru...@gmail.com>> wrote:
This may not be good advice but... could you sort by the partition key to ensure the partitions match up? Thinking of olden times :)

On Fri, Dec 23, 2022 at 4:42 AM Shivam Verma <ra...@gmail.com>> wrote:
Hi Gurunandan,

Thanks for the reply!

I do see the exchange operator in the SQL tab, but I can see it in both the experiments:
1. Using repartitioned dataframes
2. Using initial dataframes

Does that mean that the repartitioned dataframes are not actually "co-partitioned"?
If that's the case, I have two more questions:

1. Why is the job with repartitioned dataframes faster (at least 3x) as compared to the job using initial dataframes?
2. How do I ensure co-partitioning for pyspark dataframes?

Thanks,
Shivam



On Wed, Dec 14, 2022 at 5:58 PM Gurunandan <gu...@gmail.com>> wrote:
Hi,
One of the options for validation is to navigate `SQL TAB` in Spark UI
and click on a Query of interest to view detailed information of each
Query. We need to validate if the Exchange Operator is present for
shuffle, like shared in the attachment.

Otherwise we can print the executed plan and validate for Exchange
Operator in the Physical Plan.

On Wed, Dec 14, 2022 at 10:56 AM Shivam Verma <ra...@gmail.com>> wrote:
>
> Hello folks,
>
> I have a use case where I save two pyspark dataframes as parquet files and then use them later to join with each other or with other tables and perform multiple aggregations.
>
> Since I know the column being used in the downstream joins and groupby, I was hoping I could use co-partitioning for the two dataframes when saving them and avoid shuffle later.
>
> I repartitioned the two dataframes (providing same number of partitions and same column for repartitioning).
>
> While I'm seeing an improvement in execution time with the above approach, how do I confirm that a shuffle is actually NOT happening (maybe through SparkUI)?
> The spark plan and shuffle read/write are the same in the two scenarios:
> 1. Using repartitioned dataframes to perform join+aggregation
> 2. Using base dataframes itself (without explicit repartitioning) to perform join+aggregatio
>
> I have a StackOverflow post with more details regarding the same:
> https://stackoverflow.com/q/74771971/14741697<https://urldefense.com/v3/__https://stackoverflow.com/q/74771971/14741697__;!!IfjTnhH9!WcxPAxEW10TSSJEQvVW9SAlRYX4JltgQWJIdZVq1LtDJcq47o98EQF9UnDCje-bui1cPMwfa7hZ5fgNLyDihC2dS3HY$>
>
> Thanks in advance, appreciate your help!
>
> Regards,
> Shivam
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org<ma...@spark.apache.org>
--

Thanks,
Russell Jurney @rjurney<https://urldefense.com/v3/__http://twitter.com/rjurney__;!!IfjTnhH9!WcxPAxEW10TSSJEQvVW9SAlRYX4JltgQWJIdZVq1LtDJcq47o98EQF9UnDCje-bui1cPMwfa7hZ5fgNLyDih8ObrigI$> russell.jurney@gmail.com<ma...@gmail.com> LI<https://urldefense.com/v3/__http://linkedin.com/in/russelljurney__;!!IfjTnhH9!WcxPAxEW10TSSJEQvVW9SAlRYX4JltgQWJIdZVq1LtDJcq47o98EQF9UnDCje-bui1cPMwfa7hZ5fgNLyDihSjr5zIM$> FB<https://urldefense.com/v3/__http://facebook.com/jurney__;!!IfjTnhH9!WcxPAxEW10TSSJEQvVW9SAlRYX4JltgQWJIdZVq1LtDJcq47o98EQF9UnDCje-bui1cPMwfa7hZ5fgNLyDih0cHXO8w$> datasyndrome.com<https://urldefense.com/v3/__http://datasyndrome.com__;!!IfjTnhH9!WcxPAxEW10TSSJEQvVW9SAlRYX4JltgQWJIdZVq1LtDJcq47o98EQF9UnDCje-bui1cPMwfa7hZ5fgNLyDihLQl5iWo$> Book a time on Calendly<https://urldefense.com/v3/__https://calendly.com/rjurney_personal/30min__;!!IfjTnhH9!WcxPAxEW10TSSJEQvVW9SAlRYX4JltgQWJIdZVq1LtDJcq47o98EQF9UnDCje-bui1cPMwfa7hZ5fgNLyDihSEsFuq4$>

Re: Check if shuffle is caused for repartitioned pyspark dataframes

Posted by Shivam Verma <ra...@gmail.com>.
I tried sorting the repartitioned dataframes on the partition key before
saving them as parquet files, however, when I read those
repartitioned-sorted dataframes
and join them on the partition key, the spark plan still shows `Exchange
hashpartitioning` step, which I want to avoid:















*== Physical Plan ==*(5) HashAggregate(keys=[id#15373L],
functions=[sum(col_20#15393), sum(col_40#15413), max(col_60#15433)])+- *(5)
HashAggregate(keys=[id#15373L], functions=[partial_sum(col_20#15393),
partial_sum(col_40#15413), partial_max(col_60#15433)])   +- *(5) Project
[id#15373L, col_20#15393, col_40#15413, col_60#15433]      +- *(5)
SortMergeJoin [id#15373L], [id#15171L], Inner         :- *(2) Sort
[id#15373L ASC NULLS FIRST], false, 0         :  +- Exchange
hashpartitioning(id#15373L, 4) <========================= Want to avoid
this         :     +- *(1) Project [id#15373L, col_20#15393, col_40#15413,
col_60#15433]         :        +- *(1) Filter isnotnull(id#15373L)
 :           +- *(1) FileScan parquet
[id#15373L,col_20#15393,col_40#15413,col_60#15433] Batched: true, Format:
Parquet, Location:
InMemoryFileIndex[s3a://performance/co_partition_test/repartitioned_df2...,
PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema:
struct<id:bigint,col_20:double,col_40:double,col_60:double>         +- *(4)
Sort [id#15171L ASC NULLS FIRST], false, 0            +- Exchange
hashpartitioning(id#15171L, 4) <========================= Want to avoid
this                +- *(3) Project [id#15171L]                  +- *(3)
Filter isnotnull(id#15171L)                     +- *(3) FileScan parquet
[id#15171L] Batched: true, Format: Parquet, Location:
InMemoryFileIndex[s3a://performance/co_partition_test/repartitioned_df1...,
PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema:
struct<id:bigint>*

How do I ensure that this *Exchange hashpartitioning* step is skipped, or
is the exchange hashpartitioning part of the SparkPlan but won't actually
do any repartitioning and hence no overhead would be involved?

On Fri, Dec 23, 2022 at 10:08 PM Russell Jurney <ru...@gmail.com>
wrote:

> This may not be good advice but... could you sort by the partition key to
> ensure the partitions match up? Thinking of olden times :)
>
> On Fri, Dec 23, 2022 at 4:42 AM Shivam Verma <ra...@gmail.com>
> wrote:
>
>> Hi Gurunandan,
>>
>> Thanks for the reply!
>>
>> I do see the exchange operator in the SQL tab, but I can see it in both
>> the experiments:
>> 1. Using repartitioned dataframes
>> 2. Using initial dataframes
>>
>> Does that mean that the repartitioned dataframes are not actually
>> "co-partitioned"?
>> If that's the case, I have two more questions:
>>
>> 1. Why is the job with repartitioned dataframes faster (at least 3x) as
>> compared to the job using initial dataframes?
>> 2. How do I ensure co-partitioning for pyspark dataframes?
>>
>> Thanks,
>> Shivam
>>
>>
>>
>> On Wed, Dec 14, 2022 at 5:58 PM Gurunandan <gu...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> One of the options for validation is to navigate `SQL TAB` in Spark UI
>>> and click on a Query of interest to view detailed information of each
>>> Query. We need to validate if the Exchange Operator is present for
>>> shuffle, like shared in the attachment.
>>>
>>> Otherwise we can print the executed plan and validate for Exchange
>>> Operator in the Physical Plan.
>>>
>>> On Wed, Dec 14, 2022 at 10:56 AM Shivam Verma <ra...@gmail.com>
>>> wrote:
>>> >
>>> > Hello folks,
>>> >
>>> > I have a use case where I save two pyspark dataframes as parquet files
>>> and then use them later to join with each other or with other tables and
>>> perform multiple aggregations.
>>> >
>>> > Since I know the column being used in the downstream joins and
>>> groupby, I was hoping I could use co-partitioning for the two dataframes
>>> when saving them and avoid shuffle later.
>>> >
>>> > I repartitioned the two dataframes (providing same number of
>>> partitions and same column for repartitioning).
>>> >
>>> > While I'm seeing an improvement in execution time with the above
>>> approach, how do I confirm that a shuffle is actually NOT happening (maybe
>>> through SparkUI)?
>>> > The spark plan and shuffle read/write are the same in the two
>>> scenarios:
>>> > 1. Using repartitioned dataframes to perform join+aggregation
>>> > 2. Using base dataframes itself (without explicit repartitioning) to
>>> perform join+aggregatio
>>> >
>>> > I have a StackOverflow post with more details regarding the same:
>>> > https://stackoverflow.com/q/74771971/14741697
>>> >
>>> > Thanks in advance, appreciate your help!
>>> >
>>> > Regards,
>>> > Shivam
>>> >
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>
>> --
>
> Thanks,
> Russell Jurney @rjurney <http://twitter.com/rjurney>
> russell.jurney@gmail.com LI <http://linkedin.com/in/russelljurney> FB
> <http://facebook.com/jurney> datasyndrome.com Book a time on Calendly
> <https://calendly.com/rjurney_personal/30min>
>

Re: Check if shuffle is caused for repartitioned pyspark dataframes

Posted by Russell Jurney <ru...@gmail.com>.
This may not be good advice but... could you sort by the partition key to
ensure the partitions match up? Thinking of olden times :)

On Fri, Dec 23, 2022 at 4:42 AM Shivam Verma <ra...@gmail.com>
wrote:

> Hi Gurunandan,
>
> Thanks for the reply!
>
> I do see the exchange operator in the SQL tab, but I can see it in both
> the experiments:
> 1. Using repartitioned dataframes
> 2. Using initial dataframes
>
> Does that mean that the repartitioned dataframes are not actually
> "co-partitioned"?
> If that's the case, I have two more questions:
>
> 1. Why is the job with repartitioned dataframes faster (at least 3x) as
> compared to the job using initial dataframes?
> 2. How do I ensure co-partitioning for pyspark dataframes?
>
> Thanks,
> Shivam
>
>
>
> On Wed, Dec 14, 2022 at 5:58 PM Gurunandan <gu...@gmail.com>
> wrote:
>
>> Hi,
>> One of the options for validation is to navigate `SQL TAB` in Spark UI
>> and click on a Query of interest to view detailed information of each
>> Query. We need to validate if the Exchange Operator is present for
>> shuffle, like shared in the attachment.
>>
>> Otherwise we can print the executed plan and validate for Exchange
>> Operator in the Physical Plan.
>>
>> On Wed, Dec 14, 2022 at 10:56 AM Shivam Verma <ra...@gmail.com>
>> wrote:
>> >
>> > Hello folks,
>> >
>> > I have a use case where I save two pyspark dataframes as parquet files
>> and then use them later to join with each other or with other tables and
>> perform multiple aggregations.
>> >
>> > Since I know the column being used in the downstream joins and groupby,
>> I was hoping I could use co-partitioning for the two dataframes when saving
>> them and avoid shuffle later.
>> >
>> > I repartitioned the two dataframes (providing same number of partitions
>> and same column for repartitioning).
>> >
>> > While I'm seeing an improvement in execution time with the above
>> approach, how do I confirm that a shuffle is actually NOT happening (maybe
>> through SparkUI)?
>> > The spark plan and shuffle read/write are the same in the two scenarios:
>> > 1. Using repartitioned dataframes to perform join+aggregation
>> > 2. Using base dataframes itself (without explicit repartitioning) to
>> perform join+aggregatio
>> >
>> > I have a StackOverflow post with more details regarding the same:
>> > https://stackoverflow.com/q/74771971/14741697
>> >
>> > Thanks in advance, appreciate your help!
>> >
>> > Regards,
>> > Shivam
>> >
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
> --

Thanks,
Russell Jurney @rjurney <http://twitter.com/rjurney>
russell.jurney@gmail.com LI <http://linkedin.com/in/russelljurney> FB
<http://facebook.com/jurney> datasyndrome.com Book a time on Calendly
<https://calendly.com/rjurney_personal/30min>

Re: Check if shuffle is caused for repartitioned pyspark dataframes

Posted by Shivam Verma <ra...@gmail.com>.
Hi Gurunandan,

Thanks for the reply!

I do see the exchange operator in the SQL tab, but I can see it in both the
experiments:
1. Using repartitioned dataframes
2. Using initial dataframes

Does that mean that the repartitioned dataframes are not actually
"co-partitioned"?
If that's the case, I have two more questions:

1. Why is the job with repartitioned dataframes faster (at least 3x) as
compared to the job using initial dataframes?
2. How do I ensure co-partitioning for pyspark dataframes?

Thanks,
Shivam



On Wed, Dec 14, 2022 at 5:58 PM Gurunandan <gu...@gmail.com> wrote:

> Hi,
> One of the options for validation is to navigate `SQL TAB` in Spark UI
> and click on a Query of interest to view detailed information of each
> Query. We need to validate if the Exchange Operator is present for
> shuffle, like shared in the attachment.
>
> Otherwise we can print the executed plan and validate for Exchange
> Operator in the Physical Plan.
>
> On Wed, Dec 14, 2022 at 10:56 AM Shivam Verma <ra...@gmail.com>
> wrote:
> >
> > Hello folks,
> >
> > I have a use case where I save two pyspark dataframes as parquet files
> and then use them later to join with each other or with other tables and
> perform multiple aggregations.
> >
> > Since I know the column being used in the downstream joins and groupby,
> I was hoping I could use co-partitioning for the two dataframes when saving
> them and avoid shuffle later.
> >
> > I repartitioned the two dataframes (providing same number of partitions
> and same column for repartitioning).
> >
> > While I'm seeing an improvement in execution time with the above
> approach, how do I confirm that a shuffle is actually NOT happening (maybe
> through SparkUI)?
> > The spark plan and shuffle read/write are the same in the two scenarios:
> > 1. Using repartitioned dataframes to perform join+aggregation
> > 2. Using base dataframes itself (without explicit repartitioning) to
> perform join+aggregatio
> >
> > I have a StackOverflow post with more details regarding the same:
> > https://stackoverflow.com/q/74771971/14741697
> >
> > Thanks in advance, appreciate your help!
> >
> > Regards,
> > Shivam
> >
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org

Re: Check if shuffle is caused for repartitioned pyspark dataframes

Posted by Gurunandan <gu...@gmail.com>.
Hi,
One of the options for validation is to navigate `SQL TAB` in Spark UI
and click on a Query of interest to view detailed information of each
Query. We need to validate if the Exchange Operator is present for
shuffle, like shared in the attachment.

Otherwise we can print the executed plan and validate for Exchange
Operator in the Physical Plan.

On Wed, Dec 14, 2022 at 10:56 AM Shivam Verma <ra...@gmail.com> wrote:
>
> Hello folks,
>
> I have a use case where I save two pyspark dataframes as parquet files and then use them later to join with each other or with other tables and perform multiple aggregations.
>
> Since I know the column being used in the downstream joins and groupby, I was hoping I could use co-partitioning for the two dataframes when saving them and avoid shuffle later.
>
> I repartitioned the two dataframes (providing same number of partitions and same column for repartitioning).
>
> While I'm seeing an improvement in execution time with the above approach, how do I confirm that a shuffle is actually NOT happening (maybe through SparkUI)?
> The spark plan and shuffle read/write are the same in the two scenarios:
> 1. Using repartitioned dataframes to perform join+aggregation
> 2. Using base dataframes itself (without explicit repartitioning) to perform join+aggregatio
>
> I have a StackOverflow post with more details regarding the same:
> https://stackoverflow.com/q/74771971/14741697
>
> Thanks in advance, appreciate your help!
>
> Regards,
> Shivam
>