You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Naveen Nagaraj (Jira)" <ji...@apache.org> on 2022/02/10 10:03:00 UTC

[jira] [Updated] (SPARK-38172) Adaptive coalesce not working with df persist

     [ https://issues.apache.org/jira/browse/SPARK-38172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Naveen Nagaraj updated SPARK-38172:
-----------------------------------
    Attachment: image-2022-02-10-15-32-30-355.png

> Adaptive coalesce not working with df persist
> ---------------------------------------------
>
>                 Key: SPARK-38172
>                 URL: https://issues.apache.org/jira/browse/SPARK-38172
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.2.1
>         Environment: OS: Linux
> Spark Version: 3.2.3
>            Reporter: Naveen Nagaraj
>            Priority: Major
>         Attachments: image-2022-02-10-15-32-30-355.png
>
>
> {quote}val spark = SparkSession.builder().master("local[4]").appName("Test")
>                         .config("spark.sql.adaptive.enabled", "true")
>                         .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
>                         .config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "50m")
>                         .config("spark.sql.adaptive.coalescePartitions.minPartitionNum", "1")
>                         .config("spark.sql.adaptive.coalescePartitions.initialPartitionNum", "1024")
>                         .getOrCreate()
> val df = spark.read.csv("<Input File Path>")
> val df1 = df.distinct()
> df1.persist() // On removing this line. Code works as expected
> df1.write.csv("<Output File Path>")
> {quote}
> val spark = SparkSession.builder().master("local[4]").appName("Test")
>                         .config("spark.sql.adaptive.enabled", "true")
>                         .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
>                         .config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "50m")
>                         .config("spark.sql.adaptive.coalescePartitions.minPartitionNum", "1")
>                         .config("spark.sql.adaptive.coalescePartitions.initialPartitionNum", "1024")
>                         .getOrCreate()
> val df = spark.read.csv("<Input File Path>")
> val df1 = df.distinct()
> df1.persist() // On removing this line. Code works as expected
> df1.write.csv("<Output File Path>")
> I have an input file of size 2 GB which is read as 16 partitions of size 128 MB each. I have enabled adaptive SQL to coalesce partitions after the shuffle
> Without df1.persist, df1.write.csv writes 4 partition files of 50 MB each which is expected
> !image-2022-02-10-15-29-31-708.png!
> If I include df1.persist, Spark is writing 200 partitions(adaptive coalesce not working) With persist
> !image-2022-02-10-15-29-57-176.png!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org