You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Swetha Baskaran <sw...@gmail.com> on 2022/09/16 03:42:45 UTC

[Spark Internals]: Is sort order preserved after partitioned write?

Hi!

We expected the order of sorted partitions to be preserved after a
dataframe write. We use the following code to write out one file per
partition, with the rows sorted by a column.






*df    .repartition($"col1")    .sortWithinPartitions("col1", "col2")
.write    .partitionBy("col1")    .csv(path)*

However we observe unexpected sort order in some files. Does spark
guarantee sort order within partitions on write?


Thanks,
swebask

Re: [Spark Internals]: Is sort order preserved after partitioned write?

Posted by Swetha Baskaran <sw...@gmail.com>.
Hi Enrico, Using Spark version 3.1.3 and turning AQE off seems to fix the
sorting. Looking into why, do you have thoughts?

Thanks, Swetha

On Sat, Sep 17, 2022 at 1:58 PM Enrico Minack <in...@enrico.minack.dev>
wrote:

> Hi,
>
> from a quick glance over your transformations, sortCol should be sorted.
>
> Are you using Spark 3.2 or above? Can you try again with AQE turned off in
> that case?
>
>
> https://spark.apache.org/docs/latest/sql-performance-tuning.html#adaptive-query-execution
>
> Enrico
>
>
>
> Am 16.09.22 um 23:28 schrieb Swetha Baskaran:
>
> Hi Enrico,
>
> Thank you for your response!
> Could you clarify what you mean by *values for "col1" will be "randomly"
> allocated to partition files*?
>
> We observe one file per partition, however we see an alternating pattern
> of unsorted rows in some files.
> Here is the code used and the unsorted pattern observed in the output
> files.
>
>
>
>
>
>
>
>
>
> *df     .repartition(col("day"), col("month"), col("year"))
> .withColumn("partitionId",spark_partition_id)
> .withColumn("monotonicallyIncreasingIdUnsorted",monotonicallyIncreasingId)
>     .sortWithinPartitions("year", "month", "day", "sortCol")
> .withColumn("monotonicallyIncreasingIdSorted",monotonicallyIncreasingId)
>   .write     .partitionBy("year", "month", "day")     .parquet(path)*
>
> 1
> +-------+-----------+---------------------------------+-------------------------------+
> |sortCol|partitionId|monotonicallyIncreasingIdUnsorted|monotonicallyIncreasingIdSorted|
> +-------+-----------+---------------------------------+-------------------------------+
> | 100000|        732|                    6287832121344|                  6287832121344|
> |1170583|        732|                    6287842137820|                  6287876860586|
> | 100000|        732|                    6287879216173|                  6287832121345|
> |1170583|        732|                    6287890351126|                  6287876860587|
> | 100000|        732|                    6287832569336|                  6287832121346|
> |1170583|        732|                    6287843957457|                  6287876860588|
> | 100000|        732|                    6287881576840|                  6287832121347|
> |1170583|        732|                    6287892533054|                  6287876860589|
> | 100000|        732|                    6287833244394|                  6287832121348|
> |1170583|        732|                    6287847669077|                  6287876860590|
> | 100000|        732|                    6287884414741|                  6287832121349|
> |1170583|        732|                    6287894723328|                  6287876860591|
> | 100000|        732|                    6287833768679|                  6287832121350|
> |1170583|        732|                    6287849212375|                  6287876860592|
> | 100000|        732|                    6287885330261|                  6287832121351|
> |1170583|        732|                    6287896605691|                  6287876860593|
> | 100000|        732|                    6287835089415|                  6287832121352|
> |1170583|        732|                    6287851414977|                  6287876860594|
> | 100000|        732|                    6287886356164|                  6287832121353|
> |1170583|        732|                    6287899702397|                  6287876860595|
> +-------+-----------+---------------------------------+-------------------------------+
>
> 2
> +-------+-----------+---------------------------------+-------------------------------+
> |sortCol|partitionId|monotonicallyIncreasingIdUnsorted|monotonicallyIncreasingIdSorted|
> +-------+-----------+---------------------------------+-------------------------------+
> | 100000|        136|                    1168231104512|                  1168231104512|
> |1215330|        136|                    1168267800695|                  1168275843754|
> | 100000|        136|                    1168365908174|                  1168231104513|
> |1215330|        136|                    1168272121474|                  1168275843755|
> | 100000|        136|                    1168233930111|                  1168231104514|
> |1215330|        136|                    1168275020862|                  1168275843756|
> | 100000|        136|                    1168369592448|                  1168231104515|
> |1215331|        136|                    1168320722989|                  1168275843757|
> | 100000|        136|                    1168235423908|                  1168231104516|
> |1215331|        136|                    1168232219843|                  1168275843758|
> | 100000|        136|                    1168276450874|                  1168231104517|
> |1215331|        136|                    1168330171556|                  1168275843759|
> | 100000|        136|                    1168239878974|                  1168231104518|
> |1215331|        136|                    1168235045442|                  1168275843760|
> | 100000|        136|                    1168287069249|                  1168231104519|
> |1215331|        136|                    1168331936649|                  1168275843761|
> | 100000|        136|                    1168246605999|                  1168231104520|
> |1215331|        136|                    1168236539239|                  1168275843762|
> | 100000|        136|                    1168289197499|                  1168231104521|
> |1215331|        136|                    1168337136110|                  1168275843763|
> +-------+-----------+---------------------------------+-------------------------------+
>
> 3
> +-------+-----------+---------------------------------+-------------------------------+
> |sortCol|partitionId|monotonicallyIncreasingIdUnsorted|monotonicallyIncreasingIdSorted|
> +-------+-----------+---------------------------------+-------------------------------+
> | 100000|        581|                    4990751997952|                  4990751997952|
> |1207875|        581|                    4990829438530|                  4990796737194|
> | 100000|        581|                    4990797772249|                  4990751997953|
> |1207875|        581|                    4990789773711|                  4990796737195|
> | 100000|        581|                    4990754836237|                  4990751997954|
> |1207875|        581|                    4990792883763|                  4990796737196|
> | 100000|        581|                    4990799663372|                  4990751997955|
> |1207875|        581|                    4990795135016|                  4990796737197|
> | 100000|        581|                    4990754889999|                  4990751997956|
> |1207875|        581|                    4990796258628|                  4990796737198|
> | 100000|        581|                    4990801912980|                  4990751997957|
> |1207876|        581|                    4990798880125|                  4990796737199|
> | 100000|        581|                    4990755328908|                  4990751997958|
> |1207876|        581|                    4990753105828|                  4990796737200|
> | 100000|        581|                    4990804520539|                  4990751997959|
> |1207876|        581|                    4990800771248|                  4990796737201|
> | 100000|        581|                    4990756046653|                  4990751997960|
> |1207876|        581|                    4990757154529|                  4990796737202|
> | 100000|        581|                    4990806212169|                  4990751997961|
> |1207876|        581|                    4990803020856|                  4990796737203|
> +-------+-----------+---------------------------------+-------------------------------+
>
>
>
> Thanks,
> Swetha
>
>
>
> On Fri, Sep 16, 2022 at 1:45 AM Enrico Minack <in...@enrico.minack.dev>
> wrote:
>
>> Yes, you can expect each partition file to be sorted by "col1" and "col2".
>>
>> However, values for "col1" will be "randomly" allocated to partition
>> files, but all rows with the same value for "col1" will reside in the same
>> one partition file.
>>
>> What kind of unexpected sort order do you observe?
>>
>> Enrico
>>
>>
>>
>> Am 16.09.22 um 05:42 schrieb Swetha Baskaran:
>>
>> Hi!
>>
>> We expected the order of sorted partitions to be preserved after a
>> dataframe write. We use the following code to write out one file per
>> partition, with the rows sorted by a column.
>>
>>
>>
>>
>>
>>
>> *df     .repartition($"col1")     .sortWithinPartitions("col1", "col2")
>>   .write     .partitionBy("col1")     .csv(path)*
>>
>> However we observe unexpected sort order in some files. Does spark
>> guarantee sort order within partitions on write?
>>
>>
>> Thanks,
>> swebask
>>
>>
>>
>

Re: [Spark Internals]: Is sort order preserved after partitioned write?

Posted by Enrico Minack <in...@enrico.minack.dev>.
Hi,

from a quick glance over your transformations, sortCol should be sorted.

Are you using Spark 3.2 or above? Can you try again with AQE turned off 
in that case?

https://spark.apache.org/docs/latest/sql-performance-tuning.html#adaptive-query-execution

Enrico



Am 16.09.22 um 23:28 schrieb Swetha Baskaran:
> Hi Enrico,
>
> Thank you for your response!
> Could you clarify what you mean by /values for "col1" will be 
> "randomly" allocated to partition files/?
>
> We observe one file per partition, however we see an alternating 
> pattern of unsorted rows in some files.
> Here is the code used and the unsorted pattern observed in the output 
> files.
>
> /df
>     .repartition(col("day"), col("month"), col("year"))
>     .withColumn("partitionId",spark_partition_id)
> .withColumn("monotonicallyIncreasingIdUnsorted",monotonicallyIncreasingId)
>     .sortWithinPartitions("year", "month", "day", "sortCol")
> .withColumn("monotonicallyIncreasingIdSorted",monotonicallyIncreasingId)
>     .write
>     .partitionBy("year", "month", "day")
>     .parquet(path)/
>
> 1
> +-------+-----------+---------------------------------+-------------------------------+
> |sortCol|partitionId|monotonicallyIncreasingIdUnsorted|monotonicallyIncreasingIdSorted|
> +-------+-----------+---------------------------------+-------------------------------+
> | 100000|        732|                    6287832121344|                  6287832121344|
> |1170583|        732|                    6287842137820|                  6287876860586|
> | 100000|        732|                    6287879216173|                  6287832121345|
> |1170583|        732|                    6287890351126|                  6287876860587|
> | 100000|        732|                    6287832569336|                  6287832121346|
> |1170583|        732|                    6287843957457|                  6287876860588|
> | 100000|        732|                    6287881576840|                  6287832121347|
> |1170583|        732|                    6287892533054|                  6287876860589|
> | 100000|        732|                    6287833244394|                  6287832121348|
> |1170583|        732|                    6287847669077|                  6287876860590|
> | 100000|        732|                    6287884414741|                  6287832121349|
> |1170583|        732|                    6287894723328|                  6287876860591|
> | 100000|        732|                    6287833768679|                  6287832121350|
> |1170583|        732|                    6287849212375|                  6287876860592|
> | 100000|        732|                    6287885330261|                  6287832121351|
> |1170583|        732|                    6287896605691|                  6287876860593|
> | 100000|        732|                    6287835089415|                  6287832121352|
> |1170583|        732|                    6287851414977|                  6287876860594|
> | 100000|        732|                    6287886356164|                  6287832121353|
> |1170583|        732|                    6287899702397|                  6287876860595|
> +-------+-----------+---------------------------------+-------------------------------+
>
> 2
> +-------+-----------+---------------------------------+-------------------------------+
> |sortCol|partitionId|monotonicallyIncreasingIdUnsorted|monotonicallyIncreasingIdSorted|
> +-------+-----------+---------------------------------+-------------------------------+
> | 100000|        136|                    1168231104512|                  1168231104512|
> |1215330|        136|                    1168267800695|                  1168275843754|
> | 100000|        136|                    1168365908174|                  1168231104513|
> |1215330|        136|                    1168272121474|                  1168275843755|
> | 100000|        136|                    1168233930111|                  1168231104514|
> |1215330|        136|                    1168275020862|                  1168275843756|
> | 100000|        136|                    1168369592448|                  1168231104515|
> |1215331|        136|                    1168320722989|                  1168275843757|
> | 100000|        136|                    1168235423908|                  1168231104516|
> |1215331|        136|                    1168232219843|                  1168275843758|
> | 100000|        136|                    1168276450874|                  1168231104517|
> |1215331|        136|                    1168330171556|                  1168275843759|
> | 100000|        136|                    1168239878974|                  1168231104518|
> |1215331|        136|                    1168235045442|                  1168275843760|
> | 100000|        136|                    1168287069249|                  1168231104519|
> |1215331|        136|                    1168331936649|                  1168275843761|
> | 100000|        136|                    1168246605999|                  1168231104520|
> |1215331|        136|                    1168236539239|                  1168275843762|
> | 100000|        136|                    1168289197499|                  1168231104521|
> |1215331|        136|                    1168337136110|                  1168275843763|
> +-------+-----------+---------------------------------+-------------------------------+
>
> 3
> +-------+-----------+---------------------------------+-------------------------------+
> |sortCol|partitionId|monotonicallyIncreasingIdUnsorted|monotonicallyIncreasingIdSorted|
> +-------+-----------+---------------------------------+-------------------------------+
> | 100000|        581|                    4990751997952|                  4990751997952|
> |1207875|        581|                    4990829438530|                  4990796737194|
> | 100000|        581|                    4990797772249|                  4990751997953|
> |1207875|        581|                    4990789773711|                  4990796737195|
> | 100000|        581|                    4990754836237|                  4990751997954|
> |1207875|        581|                    4990792883763|                  4990796737196|
> | 100000|        581|                    4990799663372|                  4990751997955|
> |1207875|        581|                    4990795135016|                  4990796737197|
> | 100000|        581|                    4990754889999|                  4990751997956|
> |1207875|        581|                    4990796258628|                  4990796737198|
> | 100000|        581|                    4990801912980|                  4990751997957|
> |1207876|        581|                    4990798880125|                  4990796737199|
> | 100000|        581|                    4990755328908|                  4990751997958|
> |1207876|        581|                    4990753105828|                  4990796737200|
> | 100000|        581|                    4990804520539|                  4990751997959|
> |1207876|        581|                    4990800771248|                  4990796737201|
> | 100000|        581|                    4990756046653|                  4990751997960|
> |1207876|        581|                    4990757154529|                  4990796737202|
> | 100000|        581|                    4990806212169|                  4990751997961|
> |1207876|        581|                    4990803020856|                  4990796737203|
> +-------+-----------+---------------------------------+-------------------------------+
>
>
> Thanks,
> Swetha
>
>
> On Fri, Sep 16, 2022 at 1:45 AM Enrico Minack <in...@enrico.minack.dev> 
> wrote:
>
>     Yes, you can expect each partition file to be sorted by "col1" and
>     "col2".
>
>     However, values for "col1" will be "randomly" allocated to
>     partition files, but all rows with the same value for "col1" will
>     reside in the same one partition file.
>
>     What kind of unexpected sort order do you observe?
>
>     Enrico
>
>
>
>     Am 16.09.22 um 05:42 schrieb Swetha Baskaran:
>>     Hi!
>>
>>     We expected the order of sorted partitions to be preserved after
>>     a dataframe write. We use the following code to write out one
>>     file per partition, with the rows sorted by a column.
>>
>>     /df
>>         .repartition($"col1")
>>         .sortWithinPartitions("col1", "col2")
>>         .write
>>         .partitionBy("col1")
>>         .csv(path)/
>>
>>     However we observe unexpected sort order in some files. Does
>>     spark guarantee sort order within partitions on write?
>>
>>
>>     Thanks,
>>     swebask
>
>

Re: [Spark Internals]: Is sort order preserved after partitioned write?

Posted by Swetha Baskaran <sw...@gmail.com>.
Hi Enrico,

Thank you for your response!
Could you clarify what you mean by *values for "col1" will be "randomly"
allocated to partition files*?

We observe one file per partition, however we see an alternating pattern of
unsorted rows in some files.
Here is the code used and the unsorted pattern observed in the output files.









*df    .repartition(col("day"), col("month"), col("year"))
.withColumn("partitionId",spark_partition_id)
.withColumn("monotonicallyIncreasingIdUnsorted",monotonicallyIncreasingId)
  .sortWithinPartitions("year", "month", "day", "sortCol")
.withColumn("monotonicallyIncreasingIdSorted",monotonicallyIncreasingId)
.write    .partitionBy("year", "month", "day")    .parquet(path)*

1
+-------+-----------+---------------------------------+-------------------------------+
|sortCol|partitionId|monotonicallyIncreasingIdUnsorted|monotonicallyIncreasingIdSorted|
+-------+-----------+---------------------------------+-------------------------------+
| 100000|        732|                    6287832121344|
  6287832121344|
|1170583|        732|                    6287842137820|
  6287876860586|
| 100000|        732|                    6287879216173|
  6287832121345|
|1170583|        732|                    6287890351126|
  6287876860587|
| 100000|        732|                    6287832569336|
  6287832121346|
|1170583|        732|                    6287843957457|
  6287876860588|
| 100000|        732|                    6287881576840|
  6287832121347|
|1170583|        732|                    6287892533054|
  6287876860589|
| 100000|        732|                    6287833244394|
  6287832121348|
|1170583|        732|                    6287847669077|
  6287876860590|
| 100000|        732|                    6287884414741|
  6287832121349|
|1170583|        732|                    6287894723328|
  6287876860591|
| 100000|        732|                    6287833768679|
  6287832121350|
|1170583|        732|                    6287849212375|
  6287876860592|
| 100000|        732|                    6287885330261|
  6287832121351|
|1170583|        732|                    6287896605691|
  6287876860593|
| 100000|        732|                    6287835089415|
  6287832121352|
|1170583|        732|                    6287851414977|
  6287876860594|
| 100000|        732|                    6287886356164|
  6287832121353|
|1170583|        732|                    6287899702397|
  6287876860595|
+-------+-----------+---------------------------------+-------------------------------+

2
+-------+-----------+---------------------------------+-------------------------------+
|sortCol|partitionId|monotonicallyIncreasingIdUnsorted|monotonicallyIncreasingIdSorted|
+-------+-----------+---------------------------------+-------------------------------+
| 100000|        136|                    1168231104512|
  1168231104512|
|1215330|        136|                    1168267800695|
  1168275843754|
| 100000|        136|                    1168365908174|
  1168231104513|
|1215330|        136|                    1168272121474|
  1168275843755|
| 100000|        136|                    1168233930111|
  1168231104514|
|1215330|        136|                    1168275020862|
  1168275843756|
| 100000|        136|                    1168369592448|
  1168231104515|
|1215331|        136|                    1168320722989|
  1168275843757|
| 100000|        136|                    1168235423908|
  1168231104516|
|1215331|        136|                    1168232219843|
  1168275843758|
| 100000|        136|                    1168276450874|
  1168231104517|
|1215331|        136|                    1168330171556|
  1168275843759|
| 100000|        136|                    1168239878974|
  1168231104518|
|1215331|        136|                    1168235045442|
  1168275843760|
| 100000|        136|                    1168287069249|
  1168231104519|
|1215331|        136|                    1168331936649|
  1168275843761|
| 100000|        136|                    1168246605999|
  1168231104520|
|1215331|        136|                    1168236539239|
  1168275843762|
| 100000|        136|                    1168289197499|
  1168231104521|
|1215331|        136|                    1168337136110|
  1168275843763|
+-------+-----------+---------------------------------+-------------------------------+

3
+-------+-----------+---------------------------------+-------------------------------+
|sortCol|partitionId|monotonicallyIncreasingIdUnsorted|monotonicallyIncreasingIdSorted|
+-------+-----------+---------------------------------+-------------------------------+
| 100000|        581|                    4990751997952|
  4990751997952|
|1207875|        581|                    4990829438530|
  4990796737194|
| 100000|        581|                    4990797772249|
  4990751997953|
|1207875|        581|                    4990789773711|
  4990796737195|
| 100000|        581|                    4990754836237|
  4990751997954|
|1207875|        581|                    4990792883763|
  4990796737196|
| 100000|        581|                    4990799663372|
  4990751997955|
|1207875|        581|                    4990795135016|
  4990796737197|
| 100000|        581|                    4990754889999|
  4990751997956|
|1207875|        581|                    4990796258628|
  4990796737198|
| 100000|        581|                    4990801912980|
  4990751997957|
|1207876|        581|                    4990798880125|
  4990796737199|
| 100000|        581|                    4990755328908|
  4990751997958|
|1207876|        581|                    4990753105828|
  4990796737200|
| 100000|        581|                    4990804520539|
  4990751997959|
|1207876|        581|                    4990800771248|
  4990796737201|
| 100000|        581|                    4990756046653|
  4990751997960|
|1207876|        581|                    4990757154529|
  4990796737202|
| 100000|        581|                    4990806212169|
  4990751997961|
|1207876|        581|                    4990803020856|
  4990796737203|
+-------+-----------+---------------------------------+-------------------------------+



Thanks,
Swetha



On Fri, Sep 16, 2022 at 1:45 AM Enrico Minack <in...@enrico.minack.dev>
wrote:

> Yes, you can expect each partition file to be sorted by "col1" and "col2".
>
> However, values for "col1" will be "randomly" allocated to partition
> files, but all rows with the same value for "col1" will reside in the same
> one partition file.
>
> What kind of unexpected sort order do you observe?
>
> Enrico
>
>
>
> Am 16.09.22 um 05:42 schrieb Swetha Baskaran:
>
> Hi!
>
> We expected the order of sorted partitions to be preserved after a
> dataframe write. We use the following code to write out one file per
> partition, with the rows sorted by a column.
>
>
>
>
>
>
> *df     .repartition($"col1")     .sortWithinPartitions("col1", "col2")
>   .write     .partitionBy("col1")     .csv(path)*
>
> However we observe unexpected sort order in some files. Does spark
> guarantee sort order within partitions on write?
>
>
> Thanks,
> swebask
>
>
>

Re: [Spark Internals]: Is sort order preserved after partitioned write?

Posted by Enrico Minack <in...@enrico.minack.dev>.
Yes, you can expect each partition file to be sorted by "col1" and "col2".

However, values for "col1" will be "randomly" allocated to partition 
files, but all rows with the same value for "col1" will reside in the 
same one partition file.

What kind of unexpected sort order do you observe?

Enrico



Am 16.09.22 um 05:42 schrieb Swetha Baskaran:
> Hi!
>
> We expected the order of sorted partitions to be preserved after a 
> dataframe write. We use the following code to write out one file per 
> partition, with the rows sorted by a column.
>
> /df
>     .repartition($"col1")
>     .sortWithinPartitions("col1", "col2")
>     .write
>     .partitionBy("col1")
>     .csv(path)/
>
> However we observe unexpected sort order in some files. Does spark 
> guarantee sort order within partitions on write?
>
>
> Thanks,
> swebask