You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Benjamin Du <le...@outlook.com> on 2022/01/30 08:35:51 UTC

A Persisted Spark DataFrame is computed twice

I have some PySpark code like below. Basically, I persist a DataFrame (which is time-consuming to compute) to disk, call the method DataFrame.count to trigger the caching/persist immediately, and then I coalesce the DataFrame to reduce the number of partitions (the original DataFrame has 30,000 partitions) and output it to HDFS. Based on the execution time of job stages and the execution plan, it seems to me that the DataFrame is recomputed at df.coalesce(300). Does anyone know why this happens?


df = spark.read.parquet("/input/hdfs/path") \
    .filter(...) \
    .withColumn("new_col", my_pandas_udf("col0", "col1")) \
    .persist(StorageLevel.DISK_ONLY)
df.count()
df.coalesce(300).write.mode("overwrite").parquet(output_mod)


BTW, it works well if I manually write the DataFrame to HDFS, read it back, coalesce it and write it back to HDFS.

Originally post at https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice.<https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice>

Best,

----

Ben Du

Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> | Bitbucket<https://bitbucket.org/dclong/> | Docker Hub<https://hub.docker.com/r/dclong/>

Re: A Persisted Spark DataFrame is computed twice

Posted by Sean Owen <sr...@gmail.com>.
One guess - you are doing two things here, count() and write(). There is a
persist(), but it's async. It won't necessarily wait for the persist to
finish before proceeding and may have to recompute at least some partitions
for the second op. You could debug further by looking at the stages and
seeing what exactly is executing and where it uses cached partitions or not.

On Mon, Jan 31, 2022 at 2:12 AM Benjamin Du <le...@outlook.com> wrote:

> I did check the execution plan, there were 2 stages and both stages show
> that the pandas UDF (which takes almost all the computation time of the
> DataFrame) is executed.
>
> It didn't seem to be an issue of repartition/coalesce as the DataFrame was
> still computed twice after removing coalesce.
>
>
>

Re: A Persisted Spark DataFrame is computed twice

Posted by Benjamin Du <le...@outlook.com>.
I did check the execution plan, there were 2 stages and both stages show that the pandas UDF (which takes almost all the computation time of the DataFrame) is executed.

It didn't seem to be an issue of repartition/coalesce as the DataFrame was still computed twice after removing coalesce.




Best,

----

Ben Du

Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> | Bitbucket<https://bitbucket.org/dclong/> | Docker Hub<https://hub.docker.com/r/dclong/>

________________________________
From: Gourav Sengupta <go...@gmail.com>
Sent: Sunday, January 30, 2022 1:08 AM
To: sebastian.piu@gmail.com <se...@gmail.com>
Cc: Benjamin Du <le...@outlook.com>; user@spark.incubator.apache.org <us...@spark.incubator.apache.org>
Subject: Re: A Persisted Spark DataFrame is computed twice

Hi,

without getting into suppositions, the best option is to look into the SPARK UI SQL section.

It is the most wonderful tool to explain what is happening, and why. In SPARK 3.x they have made the UI even better, with different set of granularity and details.

On another note, you might want to read the difference between repartition and coalesce before making any kind of assumptions.


Regards,
Gourav Sengupta

On Sun, Jan 30, 2022 at 8:52 AM Sebastian Piu <se...@gmail.com>> wrote:
It's probably the repartitioning and deserialising the df that you are seeing take time. Try doing this

1. Add another count after your current one and compare times
2. Move coalesce before persist



You should see

On Sun, 30 Jan 2022, 08:37 Benjamin Du, <le...@outlook.com>> wrote:
I have some PySpark code like below. Basically, I persist a DataFrame (which is time-consuming to compute) to disk, call the method DataFrame.count to trigger the caching/persist immediately, and then I coalesce the DataFrame to reduce the number of partitions (the original DataFrame has 30,000 partitions) and output it to HDFS. Based on the execution time of job stages and the execution plan, it seems to me that the DataFrame is recomputed at df.coalesce(300). Does anyone know why this happens?


df = spark.read.parquet("/input/hdfs/path") \
    .filter(...) \
    .withColumn("new_col", my_pandas_udf("col0", "col1")) \
    .persist(StorageLevel.DISK_ONLY)
df.count()
df.coalesce(300).write.mode("overwrite").parquet(output_mod)


BTW, it works well if I manually write the DataFrame to HDFS, read it back, coalesce it and write it back to HDFS.

Originally post at https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice.<https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice>

Best,

----

Ben Du

Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> | Bitbucket<https://bitbucket.org/dclong/> | Docker Hub<https://hub.docker.com/r/dclong/>

Re: A Persisted Spark DataFrame is computed twice

Posted by Gourav Sengupta <go...@gmail.com>.
Hi,

without getting into suppositions, the best option is to look into the
SPARK UI SQL section.

It is the most wonderful tool to explain what is happening, and why. In
SPARK 3.x they have made the UI even better, with different set of
granularity and details.

On another note, you might want to read the difference between repartition
and coalesce before making any kind of assumptions.


Regards,
Gourav Sengupta

On Sun, Jan 30, 2022 at 8:52 AM Sebastian Piu <se...@gmail.com>
wrote:

> It's probably the repartitioning and deserialising the df that you are
> seeing take time. Try doing this
>
> 1. Add another count after your current one and compare times
> 2. Move coalesce before persist
>
>
>
> You should see
>
> On Sun, 30 Jan 2022, 08:37 Benjamin Du, <le...@outlook.com> wrote:
>
>> I have some PySpark code like below. Basically, I persist a DataFrame
>> (which is time-consuming to compute) to disk, call the method
>> DataFrame.count to trigger the caching/persist immediately, and then I
>> coalesce the DataFrame to reduce the number of partitions (the original
>> DataFrame has 30,000 partitions) and output it to HDFS. Based on the
>> execution time of job stages and the execution plan, it seems to me that
>> the DataFrame is recomputed at df.coalesce(300). Does anyone know why
>> this happens?
>>
>> df = spark.read.parquet("/input/hdfs/path") \
>>     .filter(...) \
>>     .withColumn("new_col", my_pandas_udf("col0", "col1")) \
>>     .persist(StorageLevel.DISK_ONLY)
>> df.count()
>> df.coalesce(300).write.mode("overwrite").parquet(output_mod)
>>
>>
>> BTW, it works well if I manually write the DataFrame to HDFS, read it
>> back, coalesce it and write it back to HDFS.
>> Originally post at
>> https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice.
>> <https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice>
>>
>> Best,
>>
>> ----
>>
>> Ben Du
>>
>> Personal Blog <http://www.legendu.net/> | GitHub
>> <https://github.com/dclong/> | Bitbucket <https://bitbucket.org/dclong/>
>> | Docker Hub <https://hub.docker.com/r/dclong/>
>>
>

Re: A Persisted Spark DataFrame is computed twice

Posted by Gourav Sengupta <go...@gmail.com>.
Hi,

Can you please try to use SPARK SQL, instead of dataframes and see the
difference?

You will get a lot of theoretical arguments, and that is fine, but they are
just largely and essentially theories.

Also try to apply the function to the result of the filters as a sub-query
by caching in the data of the filters first.



Regards,
Gourav Sengupta

On Mon, Jan 31, 2022 at 8:00 AM Benjamin Du <le...@outlook.com> wrote:

> I don't think coalesce (by repartitioning I assume you mean coalesce)
> itself and deserialising takes that much time. To add a little bit more
> context, the computation of the DataFrame is CPU intensive instead of
> data/IO intensive. I purposely keep coalesce​ after df.count​ as I want
> to keep the large number of partitions (30k) when computing the DataFrame
> so that I can get a much higher parallelism. After the computation, I
> reduce the number of partitions (to avoid having too many small files on
> HDFS). It typically takes about 5 hours to compute the DataFrame (when 30k
> partitions is used) and write it to disk (without doing repartitioning or
> coalesce). If I manually write the computed DataFrame to disk, read it
> back, coalesce it and then write it back to disk, it also takes about 5
> hours. The code that I pasted in this thread takes forever to run as the
> DataFrame is obviously recomputed at df.coalesce​ and with a parallelism
> of 300 partitions, it is almost impossible to compute the DataFrame in a
> reasonable amount of time.
>
> I tried various ways but none of them worked except manually write to
> disk, read it back, repartition/coalesce it, and then write it back to
> HDFS.
>
>    1. checkpoint by itself computer the DataFrame twice. (This is a known
>    existing bug of checkpoint).
>
>     output_mod = f"{output}/job={mod}"
>     spark.read.parquet("/input/hdfs/path") \
>         .filter(col("n0") == n0) \
>         .filter(col("n1") == n1) \
>         .filter(col("h1") == h1) \
>         .filter(col("j1").isin(j1)) \
>         .filter(col("j0") == j0) \
>         .filter(col("h0").isin(h0)) \
>         .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
>         .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
>         .checkpoint() \
>         .coalesce(300) \
>         .write.mode("overwrite").parquet(output_mod)
>
>
>    1. persist (to Disk) + count computer the DataFrame twice.
>
>     output_mod = f"{output}/job={mod}"
>     df = spark.read.parquet("/input/hdfs/path") \
>         .filter(col("n0") == n0) \
>         .filter(col("n1") == n1) \
>         .filter(col("h1") == h1) \
>         .filter(col("j1").isin(j1)) \
>         .filter(col("j0") == j0) \
>         .filter(col("h0").isin(h0)) \
>         .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
>         .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
>         .persist(StorageLevel.DISK_ONLY)
>     df.count()
>     df.coalesce(300).write.mode("overwrite").parquet(output_mod)
>
>
>    1. persist to memory + count computes the DataFrame twice
>
>     output_mod = f"{output}/job={mod}"
>     df = spark.read.parquet("/input/hdfs/path") \
>         .filter(col("n0") == n0) \
>         .filter(col("n1") == n1) \
>         .filter(col("h1") == h1) \
>         .filter(col("j1").isin(j1)) \
>         .filter(col("j0") == j0) \
>         .filter(col("h0").isin(h0)) \
>         .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
>         .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
>         .persist(StorageLevel.MEMORY_ONLY)
>     df.count()
>     df.coalesce(300).write.mode("overwrite").parquet(output_mod)
>
>
>    1. persist (to memory) + checkpoint + coalesce computes the DataFrame
>    twice
>
>     output_mod = f"{output}/job={mod}"
>     df = spark.read.parquet("/input/hdfs/path") \
>         .filter(col("n0") == n0) \
>         .filter(col("n1") == n1) \
>         .filter(col("h1") == h1) \
>         .filter(col("j1").isin(j1)) \
>         .filter(col("j0") == j0) \
>         .filter(col("h0").isin(h0)) \
>         .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
>         .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
>         .persist(StorageLevel.MEMORY_ONLY) \
>         .checkpoint() \
>         .coalesce(300).write.mode("overwrite").parquet(output_mod)
>
>
>    1. persist (to memory) + checkpoint + without coalesce computes the
>    DataFrame twice
>
>     output_mod = f"{output}/job={mod}"
>     df = spark.read.parquet("/input/hdfs/path") \
>         .filter(col("n0") == n0) \
>         .filter(col("n1") == n1) \
>         .filter(col("h1") == h1) \
>         .filter(col("j1").isin(j1)) \
>         .filter(col("j0") == j0) \
>         .filter(col("h0").isin(h0)) \
>         .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
>         .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
>         .persist(StorageLevel.MEMORY_ONLY) \
>         .checkpoint() \
>         .write.mode("overwrite").parquet(output_mod)
>
>
>    1. cache (equivalent to persist to MEMORY_AND_DISK) + count + coalesce
>    computes it twice
>
>     output_mod = f"{output}/job={mod}"
>     df = spark.read.parquet("/input/hdfs/path") \
>         .filter(col("n0") == n0) \
>         .filter(col("n1") == n1) \
>         .filter(col("h1") == h1) \
>         .filter(col("j1").isin(j1)) \
>         .filter(col("j0") == j0) \
>         .filter(col("h0").isin(h0)) \
>         .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
>         .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
>         .cache()
>     df.count()
>     df.coalesce(300).write.mode("overwrite").parquet(output_mod)
>
>
> A
>
> Manual output compute it only once. The function repart_hdfs below is a
> function written by myself to write a DataFrame to disk, read it back,
> repartition/coalesce it, and then write it back to HDFS.
>
> spark.read.parquet("/input/hdfs/path") \
>         .filter(col("n0") == n0) \
>         .filter(col("n1") == n1) \
>         .filter(col("h1") == h1) \
>         .filter(col("j1").isin(j1)) \
>         .filter(col("j0") == j0) \
>         .filter(col("h0").isin(h0)) \
>         .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
>         .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
>         .write.mode("overwrite").parquet(output_mod)
>     repart_hdfs(spark, output_mod, 300, coalesce=True)
>
>
>
>
>
>
>
>
>
>
>
> Best,
>
> ----
>
> Ben Du
>
> Personal Blog <http://www.legendu.net/> | GitHub
> <https://github.com/dclong/> | Bitbucket <https://bitbucket.org/dclong/>
> | Docker Hub <https://hub.docker.com/r/dclong/>
> ------------------------------
> *From:* Sebastian Piu <se...@gmail.com>
> *Sent:* Sunday, January 30, 2022 12:44 AM
> *To:* Benjamin Du <le...@outlook.com>
> *Cc:* user@spark.incubator.apache.org <us...@spark.incubator.apache.org>
> *Subject:* Re: A Persisted Spark DataFrame is computed twice
>
> It's probably the repartitioning and deserialising the df that you are
> seeing take time. Try doing this
>
> 1. Add another count after your current one and compare times
> 2. Move coalesce before persist
>
>
>
> You should see
>
> On Sun, 30 Jan 2022, 08:37 Benjamin Du, <le...@outlook.com> wrote:
>
> I have some PySpark code like below. Basically, I persist a DataFrame
> (which is time-consuming to compute) to disk, call the method
> DataFrame.count to trigger the caching/persist immediately, and then I
> coalesce the DataFrame to reduce the number of partitions (the original
> DataFrame has 30,000 partitions) and output it to HDFS. Based on the
> execution time of job stages and the execution plan, it seems to me that
> the DataFrame is recomputed at df.coalesce(300). Does anyone know why
> this happens?
>
> df = spark.read.parquet("/input/hdfs/path") \
>     .filter(...) \
>     .withColumn("new_col", my_pandas_udf("col0", "col1")) \
>     .persist(StorageLevel.DISK_ONLY)
> df.count()
> df.coalesce(300).write.mode("overwrite").parquet(output_mod)
>
>
> BTW, it works well if I manually write the DataFrame to HDFS, read it
> back, coalesce it and write it back to HDFS.
> Originally post at
> https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice.
> <https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice>
>
> Best,
>
> ----
>
> Ben Du
>
> Personal Blog <http://www.legendu.net/> | GitHub
> <https://github.com/dclong/> | Bitbucket <https://bitbucket.org/dclong/>
> | Docker Hub <https://hub.docker.com/r/dclong/>
>
>

Re: A Persisted Spark DataFrame is computed twice

Posted by Benjamin Du <le...@outlook.com>.
I don't think coalesce (by repartitioning I assume you mean coalesce) itself and deserialising takes that much time. To add a little bit more context, the computation of the DataFrame is CPU intensive instead of data/IO intensive. I purposely keep coalesce​ after df.count​ as I want to keep the large number of partitions (30k) when computing the DataFrame so that I can get a much higher parallelism. After the computation, I reduce the number of partitions (to avoid having too many small files on HDFS). It typically takes about 5 hours to compute the DataFrame (when 30k partitions is used) and write it to disk (without doing repartitioning or coalesce). If I manually write the computed DataFrame to disk, read it back, coalesce it and then write it back to disk, it also takes about 5 hours. The code that I pasted in this thread takes forever to run as the DataFrame is obviously recomputed at df.coalesce​ and with a parallelism of 300 partitions, it is almost impossible to compute the DataFrame in a reasonable amount of time.

I tried various ways but none of them worked except manually write to disk, read it back, repartition/coalesce it, and then write it back to HDFS.

  1.  checkpoint by itself computer the DataFrame twice. (This is a known existing bug of checkpoint).

    output_mod = f"{output}/job={mod}"
    spark.read.parquet("/input/hdfs/path") \
        .filter(col("n0") == n0) \
        .filter(col("n1") == n1) \
        .filter(col("h1") == h1) \
        .filter(col("j1").isin(j1)) \
        .filter(col("j0") == j0) \
        .filter(col("h0").isin(h0)) \
        .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
        .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
        .checkpoint() \
        .coalesce(300) \
        .write.mode("overwrite").parquet(output_mod)


  1.  persist (to Disk) + count computer the DataFrame twice.

    output_mod = f"{output}/job={mod}"
    df = spark.read.parquet("/input/hdfs/path") \
        .filter(col("n0") == n0) \
        .filter(col("n1") == n1) \
        .filter(col("h1") == h1) \
        .filter(col("j1").isin(j1)) \
        .filter(col("j0") == j0) \
        .filter(col("h0").isin(h0)) \
        .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
        .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
        .persist(StorageLevel.DISK_ONLY)
    df.count()
    df.coalesce(300).write.mode("overwrite").parquet(output_mod)


  1.  persist to memory + count computes the DataFrame twice

    output_mod = f"{output}/job={mod}"
    df = spark.read.parquet("/input/hdfs/path") \
        .filter(col("n0") == n0) \
        .filter(col("n1") == n1) \
        .filter(col("h1") == h1) \
        .filter(col("j1").isin(j1)) \
        .filter(col("j0") == j0) \
        .filter(col("h0").isin(h0)) \
        .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
        .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
        .persist(StorageLevel.MEMORY_ONLY)
    df.count()
    df.coalesce(300).write.mode("overwrite").parquet(output_mod)


  1.  persist (to memory) + checkpoint + coalesce computes the DataFrame twice

    output_mod = f"{output}/job={mod}"
    df = spark.read.parquet("/input/hdfs/path") \
        .filter(col("n0") == n0) \
        .filter(col("n1") == n1) \
        .filter(col("h1") == h1) \
        .filter(col("j1").isin(j1)) \
        .filter(col("j0") == j0) \
        .filter(col("h0").isin(h0)) \
        .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
        .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
        .persist(StorageLevel.MEMORY_ONLY) \
        .checkpoint() \
        .coalesce(300).write.mode("overwrite").parquet(output_mod)


  1.  persist (to memory) + checkpoint + without coalesce computes the DataFrame twice

    output_mod = f"{output}/job={mod}"
    df = spark.read.parquet("/input/hdfs/path") \
        .filter(col("n0") == n0) \
        .filter(col("n1") == n1) \
        .filter(col("h1") == h1) \
        .filter(col("j1").isin(j1)) \
        .filter(col("j0") == j0) \
        .filter(col("h0").isin(h0)) \
        .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
        .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
        .persist(StorageLevel.MEMORY_ONLY) \
        .checkpoint() \
        .write.mode("overwrite").parquet(output_mod)


  1.  cache (equivalent to persist to MEMORY_AND_DISK) + count + coalesce computes it twice

    output_mod = f"{output}/job={mod}"
    df = spark.read.parquet("/input/hdfs/path") \
        .filter(col("n0") == n0) \
        .filter(col("n1") == n1) \
        .filter(col("h1") == h1) \
        .filter(col("j1").isin(j1)) \
        .filter(col("j0") == j0) \
        .filter(col("h0").isin(h0)) \
        .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
        .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
        .cache()
    df.count()
    df.coalesce(300).write.mode("overwrite").parquet(output_mod)


A

Manual output compute it only once. The function repart_hdfs below is a function written by myself to write a DataFrame to disk, read it back, repartition/coalesce it, and then write it back to HDFS.

spark.read.parquet("/input/hdfs/path") \
        .filter(col("n0") == n0) \
        .filter(col("n1") == n1) \
        .filter(col("h1") == h1) \
        .filter(col("j1").isin(j1)) \
        .filter(col("j0") == j0) \
        .filter(col("h0").isin(h0)) \
        .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
        .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
        .write.mode("overwrite").parquet(output_mod)
    repart_hdfs(spark, output_mod, 300, coalesce=True)











Best,

----

Ben Du

Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> | Bitbucket<https://bitbucket.org/dclong/> | Docker Hub<https://hub.docker.com/r/dclong/>

________________________________
From: Sebastian Piu <se...@gmail.com>
Sent: Sunday, January 30, 2022 12:44 AM
To: Benjamin Du <le...@outlook.com>
Cc: user@spark.incubator.apache.org <us...@spark.incubator.apache.org>
Subject: Re: A Persisted Spark DataFrame is computed twice

It's probably the repartitioning and deserialising the df that you are seeing take time. Try doing this

1. Add another count after your current one and compare times
2. Move coalesce before persist



You should see

On Sun, 30 Jan 2022, 08:37 Benjamin Du, <le...@outlook.com>> wrote:
I have some PySpark code like below. Basically, I persist a DataFrame (which is time-consuming to compute) to disk, call the method DataFrame.count to trigger the caching/persist immediately, and then I coalesce the DataFrame to reduce the number of partitions (the original DataFrame has 30,000 partitions) and output it to HDFS. Based on the execution time of job stages and the execution plan, it seems to me that the DataFrame is recomputed at df.coalesce(300). Does anyone know why this happens?


df = spark.read.parquet("/input/hdfs/path") \
    .filter(...) \
    .withColumn("new_col", my_pandas_udf("col0", "col1")) \
    .persist(StorageLevel.DISK_ONLY)
df.count()
df.coalesce(300).write.mode("overwrite").parquet(output_mod)


BTW, it works well if I manually write the DataFrame to HDFS, read it back, coalesce it and write it back to HDFS.

Originally post at https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice.<https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice>

Best,

----

Ben Du

Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> | Bitbucket<https://bitbucket.org/dclong/> | Docker Hub<https://hub.docker.com/r/dclong/>

Re: A Persisted Spark DataFrame is computed twice

Posted by Sebastian Piu <se...@gmail.com>.
It's probably the repartitioning and deserialising the df that you are
seeing take time. Try doing this

1. Add another count after your current one and compare times
2. Move coalesce before persist



You should see

On Sun, 30 Jan 2022, 08:37 Benjamin Du, <le...@outlook.com> wrote:

> I have some PySpark code like below. Basically, I persist a DataFrame
> (which is time-consuming to compute) to disk, call the method
> DataFrame.count to trigger the caching/persist immediately, and then I
> coalesce the DataFrame to reduce the number of partitions (the original
> DataFrame has 30,000 partitions) and output it to HDFS. Based on the
> execution time of job stages and the execution plan, it seems to me that
> the DataFrame is recomputed at df.coalesce(300). Does anyone know why
> this happens?
>
> df = spark.read.parquet("/input/hdfs/path") \
>     .filter(...) \
>     .withColumn("new_col", my_pandas_udf("col0", "col1")) \
>     .persist(StorageLevel.DISK_ONLY)
> df.count()
> df.coalesce(300).write.mode("overwrite").parquet(output_mod)
>
>
> BTW, it works well if I manually write the DataFrame to HDFS, read it
> back, coalesce it and write it back to HDFS.
> Originally post at
> https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice.
> <https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice>
>
> Best,
>
> ----
>
> Ben Du
>
> Personal Blog <http://www.legendu.net/> | GitHub
> <https://github.com/dclong/> | Bitbucket <https://bitbucket.org/dclong/>
> | Docker Hub <https://hub.docker.com/r/dclong/>
>

Re: A Persisted Spark DataFrame is computed twice

Posted by Sebastian Piu <se...@gmail.com>.
Can you share the stages as seen in the spark ui for the count and coalesce
jobs

My suggestion of moving things around was just for troubleshooting rather
than a solution of that wasn't clear before

On Mon, 31 Jan 2022, 08:07 Benjamin Du, <le...@outlook.com> wrote:

> Remvoing coalesce didn't help either.
>
>
>
> Best,
>
> ----
>
> Ben Du
>
> Personal Blog <http://www.legendu.net/> | GitHub
> <https://github.com/dclong/> | Bitbucket <https://bitbucket.org/dclong/>
> | Docker Hub <https://hub.docker.com/r/dclong/>
> ------------------------------
> *From:* Deepak Sharma <de...@gmail.com>
> *Sent:* Sunday, January 30, 2022 12:45 AM
> *To:* Benjamin Du <le...@outlook.com>
> *Cc:* user@spark.incubator.apache.org <us...@spark.incubator.apache.org>
> *Subject:* Re: A Persisted Spark DataFrame is computed twice
>
> coalesce returns a new dataset.
> That will cause the recomputation.
>
> Thanks
> Deepak
>
> On Sun, 30 Jan 2022 at 14:06, Benjamin Du <le...@outlook.com> wrote:
>
> I have some PySpark code like below. Basically, I persist a DataFrame
> (which is time-consuming to compute) to disk, call the method
> DataFrame.count to trigger the caching/persist immediately, and then I
> coalesce the DataFrame to reduce the number of partitions (the original
> DataFrame has 30,000 partitions) and output it to HDFS. Based on the
> execution time of job stages and the execution plan, it seems to me that
> the DataFrame is recomputed at df.coalesce(300). Does anyone know why
> this happens?
>
> df = spark.read.parquet("/input/hdfs/path") \
>     .filter(...) \
>     .withColumn("new_col", my_pandas_udf("col0", "col1")) \
>     .persist(StorageLevel.DISK_ONLY)
> df.count()
> df.coalesce(300).write.mode("overwrite").parquet(output_mod)
>
>
> BTW, it works well if I manually write the DataFrame to HDFS, read it
> back, coalesce it and write it back to HDFS.
> Originally post at
> https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice.
> <https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice>
>
> Best,
>
> ----
>
> Ben Du
>
> Personal Blog <http://www.legendu.net/> | GitHub
> <https://github.com/dclong/> | Bitbucket <https://bitbucket.org/dclong/>
> | Docker Hub <https://hub.docker.com/r/dclong/>
>
>
>
> --
> Thanks
> Deepak
> www.bigdatabig.com
> www.keosha.net
>

Re: A Persisted Spark DataFrame is computed twice

Posted by Benjamin Du <le...@outlook.com>.
Remvoing coalesce didn't help either.




Best,

----

Ben Du

Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> | Bitbucket<https://bitbucket.org/dclong/> | Docker Hub<https://hub.docker.com/r/dclong/>

________________________________
From: Deepak Sharma <de...@gmail.com>
Sent: Sunday, January 30, 2022 12:45 AM
To: Benjamin Du <le...@outlook.com>
Cc: user@spark.incubator.apache.org <us...@spark.incubator.apache.org>
Subject: Re: A Persisted Spark DataFrame is computed twice

coalesce returns a new dataset.
That will cause the recomputation.

Thanks
Deepak

On Sun, 30 Jan 2022 at 14:06, Benjamin Du <le...@outlook.com>> wrote:
I have some PySpark code like below. Basically, I persist a DataFrame (which is time-consuming to compute) to disk, call the method DataFrame.count to trigger the caching/persist immediately, and then I coalesce the DataFrame to reduce the number of partitions (the original DataFrame has 30,000 partitions) and output it to HDFS. Based on the execution time of job stages and the execution plan, it seems to me that the DataFrame is recomputed at df.coalesce(300). Does anyone know why this happens?


df = spark.read.parquet("/input/hdfs/path") \
    .filter(...) \
    .withColumn("new_col", my_pandas_udf("col0", "col1")) \
    .persist(StorageLevel.DISK_ONLY)
df.count()
df.coalesce(300).write.mode("overwrite").parquet(output_mod)


BTW, it works well if I manually write the DataFrame to HDFS, read it back, coalesce it and write it back to HDFS.

Originally post at https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice.<https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice>

Best,

----

Ben Du

Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> | Bitbucket<https://bitbucket.org/dclong/> | Docker Hub<https://hub.docker.com/r/dclong/>


--
Thanks
Deepak
www.bigdatabig.com<http://www.bigdatabig.com>
www.keosha.net<http://www.keosha.net>

Re: A Persisted Spark DataFrame is computed twice

Posted by Deepak Sharma <de...@gmail.com>.
coalesce returns a new dataset.
That will cause the recomputation.

Thanks
Deepak

On Sun, 30 Jan 2022 at 14:06, Benjamin Du <le...@outlook.com> wrote:

> I have some PySpark code like below. Basically, I persist a DataFrame
> (which is time-consuming to compute) to disk, call the method
> DataFrame.count to trigger the caching/persist immediately, and then I
> coalesce the DataFrame to reduce the number of partitions (the original
> DataFrame has 30,000 partitions) and output it to HDFS. Based on the
> execution time of job stages and the execution plan, it seems to me that
> the DataFrame is recomputed at df.coalesce(300). Does anyone know why
> this happens?
>
> df = spark.read.parquet("/input/hdfs/path") \
>     .filter(...) \
>     .withColumn("new_col", my_pandas_udf("col0", "col1")) \
>     .persist(StorageLevel.DISK_ONLY)
> df.count()
> df.coalesce(300).write.mode("overwrite").parquet(output_mod)
>
>
> BTW, it works well if I manually write the DataFrame to HDFS, read it
> back, coalesce it and write it back to HDFS.
> Originally post at
> https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice.
> <https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice>
>
> Best,
>
> ----
>
> Ben Du
>
> Personal Blog <http://www.legendu.net/> | GitHub
> <https://github.com/dclong/> | Bitbucket <https://bitbucket.org/dclong/>
> | Docker Hub <https://hub.docker.com/r/dclong/>
>


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net