You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Gourav Sengupta <go...@gmail.com> on 2022/02/01 08:24:24 UTC

Re: A Persisted Spark DataFrame is computed twice

Hi,

Can you please try to use SPARK SQL, instead of dataframes and see the
difference?

You will get a lot of theoretical arguments, and that is fine, but they are
just largely and essentially theories.

Also try to apply the function to the result of the filters as a sub-query
by caching in the data of the filters first.



Regards,
Gourav Sengupta

On Mon, Jan 31, 2022 at 8:00 AM Benjamin Du <le...@outlook.com> wrote:

> I don't think coalesce (by repartitioning I assume you mean coalesce)
> itself and deserialising takes that much time. To add a little bit more
> context, the computation of the DataFrame is CPU intensive instead of
> data/IO intensive. I purposely keep coalesce​ after df.count​ as I want
> to keep the large number of partitions (30k) when computing the DataFrame
> so that I can get a much higher parallelism. After the computation, I
> reduce the number of partitions (to avoid having too many small files on
> HDFS). It typically takes about 5 hours to compute the DataFrame (when 30k
> partitions is used) and write it to disk (without doing repartitioning or
> coalesce). If I manually write the computed DataFrame to disk, read it
> back, coalesce it and then write it back to disk, it also takes about 5
> hours. The code that I pasted in this thread takes forever to run as the
> DataFrame is obviously recomputed at df.coalesce​ and with a parallelism
> of 300 partitions, it is almost impossible to compute the DataFrame in a
> reasonable amount of time.
>
> I tried various ways but none of them worked except manually write to
> disk, read it back, repartition/coalesce it, and then write it back to
> HDFS.
>
>    1. checkpoint by itself computer the DataFrame twice. (This is a known
>    existing bug of checkpoint).
>
>     output_mod = f"{output}/job={mod}"
>     spark.read.parquet("/input/hdfs/path") \
>         .filter(col("n0") == n0) \
>         .filter(col("n1") == n1) \
>         .filter(col("h1") == h1) \
>         .filter(col("j1").isin(j1)) \
>         .filter(col("j0") == j0) \
>         .filter(col("h0").isin(h0)) \
>         .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
>         .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
>         .checkpoint() \
>         .coalesce(300) \
>         .write.mode("overwrite").parquet(output_mod)
>
>
>    1. persist (to Disk) + count computer the DataFrame twice.
>
>     output_mod = f"{output}/job={mod}"
>     df = spark.read.parquet("/input/hdfs/path") \
>         .filter(col("n0") == n0) \
>         .filter(col("n1") == n1) \
>         .filter(col("h1") == h1) \
>         .filter(col("j1").isin(j1)) \
>         .filter(col("j0") == j0) \
>         .filter(col("h0").isin(h0)) \
>         .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
>         .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
>         .persist(StorageLevel.DISK_ONLY)
>     df.count()
>     df.coalesce(300).write.mode("overwrite").parquet(output_mod)
>
>
>    1. persist to memory + count computes the DataFrame twice
>
>     output_mod = f"{output}/job={mod}"
>     df = spark.read.parquet("/input/hdfs/path") \
>         .filter(col("n0") == n0) \
>         .filter(col("n1") == n1) \
>         .filter(col("h1") == h1) \
>         .filter(col("j1").isin(j1)) \
>         .filter(col("j0") == j0) \
>         .filter(col("h0").isin(h0)) \
>         .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
>         .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
>         .persist(StorageLevel.MEMORY_ONLY)
>     df.count()
>     df.coalesce(300).write.mode("overwrite").parquet(output_mod)
>
>
>    1. persist (to memory) + checkpoint + coalesce computes the DataFrame
>    twice
>
>     output_mod = f"{output}/job={mod}"
>     df = spark.read.parquet("/input/hdfs/path") \
>         .filter(col("n0") == n0) \
>         .filter(col("n1") == n1) \
>         .filter(col("h1") == h1) \
>         .filter(col("j1").isin(j1)) \
>         .filter(col("j0") == j0) \
>         .filter(col("h0").isin(h0)) \
>         .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
>         .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
>         .persist(StorageLevel.MEMORY_ONLY) \
>         .checkpoint() \
>         .coalesce(300).write.mode("overwrite").parquet(output_mod)
>
>
>    1. persist (to memory) + checkpoint + without coalesce computes the
>    DataFrame twice
>
>     output_mod = f"{output}/job={mod}"
>     df = spark.read.parquet("/input/hdfs/path") \
>         .filter(col("n0") == n0) \
>         .filter(col("n1") == n1) \
>         .filter(col("h1") == h1) \
>         .filter(col("j1").isin(j1)) \
>         .filter(col("j0") == j0) \
>         .filter(col("h0").isin(h0)) \
>         .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
>         .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
>         .persist(StorageLevel.MEMORY_ONLY) \
>         .checkpoint() \
>         .write.mode("overwrite").parquet(output_mod)
>
>
>    1. cache (equivalent to persist to MEMORY_AND_DISK) + count + coalesce
>    computes it twice
>
>     output_mod = f"{output}/job={mod}"
>     df = spark.read.parquet("/input/hdfs/path") \
>         .filter(col("n0") == n0) \
>         .filter(col("n1") == n1) \
>         .filter(col("h1") == h1) \
>         .filter(col("j1").isin(j1)) \
>         .filter(col("j0") == j0) \
>         .filter(col("h0").isin(h0)) \
>         .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
>         .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
>         .cache()
>     df.count()
>     df.coalesce(300).write.mode("overwrite").parquet(output_mod)
>
>
> A
>
> Manual output compute it only once. The function repart_hdfs below is a
> function written by myself to write a DataFrame to disk, read it back,
> repartition/coalesce it, and then write it back to HDFS.
>
> spark.read.parquet("/input/hdfs/path") \
>         .filter(col("n0") == n0) \
>         .filter(col("n1") == n1) \
>         .filter(col("h1") == h1) \
>         .filter(col("j1").isin(j1)) \
>         .filter(col("j0") == j0) \
>         .filter(col("h0").isin(h0)) \
>         .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
>         .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
>         .write.mode("overwrite").parquet(output_mod)
>     repart_hdfs(spark, output_mod, 300, coalesce=True)
>
>
>
>
>
>
>
>
>
>
>
> Best,
>
> ----
>
> Ben Du
>
> Personal Blog <http://www.legendu.net/> | GitHub
> <https://github.com/dclong/> | Bitbucket <https://bitbucket.org/dclong/>
> | Docker Hub <https://hub.docker.com/r/dclong/>
> ------------------------------
> *From:* Sebastian Piu <se...@gmail.com>
> *Sent:* Sunday, January 30, 2022 12:44 AM
> *To:* Benjamin Du <le...@outlook.com>
> *Cc:* user@spark.incubator.apache.org <us...@spark.incubator.apache.org>
> *Subject:* Re: A Persisted Spark DataFrame is computed twice
>
> It's probably the repartitioning and deserialising the df that you are
> seeing take time. Try doing this
>
> 1. Add another count after your current one and compare times
> 2. Move coalesce before persist
>
>
>
> You should see
>
> On Sun, 30 Jan 2022, 08:37 Benjamin Du, <le...@outlook.com> wrote:
>
> I have some PySpark code like below. Basically, I persist a DataFrame
> (which is time-consuming to compute) to disk, call the method
> DataFrame.count to trigger the caching/persist immediately, and then I
> coalesce the DataFrame to reduce the number of partitions (the original
> DataFrame has 30,000 partitions) and output it to HDFS. Based on the
> execution time of job stages and the execution plan, it seems to me that
> the DataFrame is recomputed at df.coalesce(300). Does anyone know why
> this happens?
>
> df = spark.read.parquet("/input/hdfs/path") \
>     .filter(...) \
>     .withColumn("new_col", my_pandas_udf("col0", "col1")) \
>     .persist(StorageLevel.DISK_ONLY)
> df.count()
> df.coalesce(300).write.mode("overwrite").parquet(output_mod)
>
>
> BTW, it works well if I manually write the DataFrame to HDFS, read it
> back, coalesce it and write it back to HDFS.
> Originally post at
> https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice.
> <https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice>
>
> Best,
>
> ----
>
> Ben Du
>
> Personal Blog <http://www.legendu.net/> | GitHub
> <https://github.com/dclong/> | Bitbucket <https://bitbucket.org/dclong/>
> | Docker Hub <https://hub.docker.com/r/dclong/>
>
>