You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Haejoon Lee <ha...@databricks.com> on 2021/08/30 05:59:19 UTC

Question using multiple partition for Window cumulative functions when partition is not specified.

Hi all,

I noticed that Spark uses only one partition when performing Window
cumulative functions without specifying the partition, so all the dataset
is moved into a single partition which easily causes OOM or serious
performance degradation.

See the example below:

>>> from pyspark.sql import functions as F, Window
>>> sdf = spark.range(10)
>>> sdf.select(F.sum(sdf["id"]).over(Window.rowsBetween(Window.unboundedPreceding, Window.currentRow))).show()
...
WARN WindowExec: No Partition Defined for Window operation! Moving all
data to a single partition, this can cause serious performance
degradation.
...
+---------------------------------------------------------------+
|sum(id) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)|
+---------------------------------------------------------------+
|                                                              0|
|                                                              1|
|                                                              3|
|                                                              6|
|                                                             10|
|                                                             15|
|                                                             21|
|                                                             28|
|                                                             36|
|                                                             45|
+---------------------------------------------------------------+

As shown in the example, the window cumulative function requires the result
of the previous operation to be used for the next operation. In Spark, it
is calculated by simply moving all data to one partition if a partition is
not specified.

To overcome this, for example in Dask, they introduce the concept of
Overlapping
Computations <https://docs.dask.org/en/latest/array-overlap.html>, which
creates the copies of the entire dataset into multiple blocks and
sequentially performs the cumulative function, when the dataset exceeds the
memory size.

Of course, this method requires more cost for creating the copies and
communication of each block, but it allows performing cumulative functions
when even the size of the dataset exceeds the size of the memory, rather
than causing the OOM.

So, it's the way to simply resolve the out-of-memory issue without any
performance advantage, though.

I think maybe this kind of use case is pretty common in data science, but I
wonder how frequent these use cases are in Apache Spark.

Would it be helpful to implement this way in Apache Spark when doing Window
cumulative functions on out-of-memory data without specifying partition??

Check here <https://github.com/databricks/koalas/issues/1386> where the
issue was firstly initiated, for more detail.


Best,

Haejoon.

Re: Question using multiple partition for Window cumulative functions when partition is not specified.

Posted by angers zhu <an...@gmail.com>.
Hi haejoon,

I think you can check the discuss in
https://github.com/apache/spark/pull/27861

Best regards
Angers

Haejoon Lee <ha...@databricks.com> 于2021年8月30日周一 下午1:59写道:

> Hi all,
>
> I noticed that Spark uses only one partition when performing Window
> cumulative functions without specifying the partition, so all the dataset
> is moved into a single partition which easily causes OOM or serious
> performance degradation.
>
> See the example below:
>
> >>> from pyspark.sql import functions as F, Window
> >>> sdf = spark.range(10)
> >>> sdf.select(F.sum(sdf["id"]).over(Window.rowsBetween(Window.unboundedPreceding, Window.currentRow))).show()
> ...
> WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
> ...
> +---------------------------------------------------------------+
> |sum(id) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)|
> +---------------------------------------------------------------+
> |                                                              0|
> |                                                              1|
> |                                                              3|
> |                                                              6|
> |                                                             10|
> |                                                             15|
> |                                                             21|
> |                                                             28|
> |                                                             36|
> |                                                             45|
> +---------------------------------------------------------------+
>
> As shown in the example, the window cumulative function requires the
> result of the previous operation to be used for the next operation. In
> Spark, it is calculated by simply moving all data to one partition if a
> partition is not specified.
>
> To overcome this, for example in Dask, they introduce the concept of Overlapping
> Computations <https://docs.dask.org/en/latest/array-overlap.html>, which
> creates the copies of the entire dataset into multiple blocks and
> sequentially performs the cumulative function, when the dataset exceeds the
> memory size.
>
> Of course, this method requires more cost for creating the copies and
> communication of each block, but it allows performing cumulative functions
> when even the size of the dataset exceeds the size of the memory, rather
> than causing the OOM.
>
> So, it's the way to simply resolve the out-of-memory issue without any
> performance advantage, though.
>
> I think maybe this kind of use case is pretty common in data science, but
> I wonder how frequent these use cases are in Apache Spark.
>
> Would it be helpful to implement this way in Apache Spark when doing
> Window cumulative functions on out-of-memory data without specifying
> partition??
>
> Check here <https://github.com/databricks/koalas/issues/1386> where the
> issue was firstly initiated, for more detail.
>
>
> Best,
>
> Haejoon.
>

Re: Question using multiple partition for Window cumulative functions when partition is not specified.

Posted by Sean Owen <sr...@gmail.com>.
You just have 1 partition here because the input is so small. You can
always repartition this further for parallelism.
Is the issue that you're not partitioning the window itself, maybe?

On Mon, Aug 30, 2021 at 12:59 AM Haejoon Lee <ha...@databricks.com>
wrote:

> Hi all,
>
> I noticed that Spark uses only one partition when performing Window
> cumulative functions without specifying the partition, so all the dataset
> is moved into a single partition which easily causes OOM or serious
> performance degradation.
>
> See the example below:
>
> >>> from pyspark.sql import functions as F, Window
> >>> sdf = spark.range(10)
> >>> sdf.select(F.sum(sdf["id"]).over(Window.rowsBetween(Window.unboundedPreceding, Window.currentRow))).show()
> ...
> WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
> ...
> +---------------------------------------------------------------+
> |sum(id) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)|
> +---------------------------------------------------------------+
> |                                                              0|
> |                                                              1|
> |                                                              3|
> |                                                              6|
> |                                                             10|
> |                                                             15|
> |                                                             21|
> |                                                             28|
> |                                                             36|
> |                                                             45|
> +---------------------------------------------------------------+
>
> As shown in the example, the window cumulative function requires the
> result of the previous operation to be used for the next operation. In
> Spark, it is calculated by simply moving all data to one partition if a
> partition is not specified.
>
> To overcome this, for example in Dask, they introduce the concept of Overlapping
> Computations <https://docs.dask.org/en/latest/array-overlap.html>, which
> creates the copies of the entire dataset into multiple blocks and
> sequentially performs the cumulative function, when the dataset exceeds the
> memory size.
>
> Of course, this method requires more cost for creating the copies and
> communication of each block, but it allows performing cumulative functions
> when even the size of the dataset exceeds the size of the memory, rather
> than causing the OOM.
>
> So, it's the way to simply resolve the out-of-memory issue without any
> performance advantage, though.
>
> I think maybe this kind of use case is pretty common in data science, but
> I wonder how frequent these use cases are in Apache Spark.
>
> Would it be helpful to implement this way in Apache Spark when doing
> Window cumulative functions on out-of-memory data without specifying
> partition??
>
> Check here <https://github.com/databricks/koalas/issues/1386> where the
> issue was firstly initiated, for more detail.
>
>
> Best,
>
> Haejoon.
>