You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by mhawes <ha...@gmail.com> on 2021/05/11 22:18:31 UTC

Re: [Spark Core]: Adding support for size based partition coalescing

Hi angers.zhu,

Reviving this thread to say that while it's not ideal (as it recomputes the
last stage) I think the `SizeBasedCoaleaser` solution seems like a good
option. If you don't mind re-raising that PR that would be great.
Alternatively I'm happy to make the PR based on your previous PR?

What do you think?

Matt



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


Re: [Spark Core]: Adding support for size based partition coalescing

Posted by Wenchen Fan <cl...@gmail.com>.
Without AQE, repartition() simply creates 200 (the value of
spark.sql.shuffle.partitions) partitions AFAIK. The AQE helps you to
coalesce the partitions into a reasonable number, by size. Note that you
need to tune spark.sql.shuffle.partitions to make sure it's big enough, as
AQE can not increase the number of partitions, only coalesce.

On Tue, May 25, 2021 at 2:35 AM Tom Graves <tg...@yahoo.com> wrote:

> so repartition() would look at some other config (
> spark.sql.adaptive.advisoryPartitionSizeInBytes) to decide the size to
> use to partition it on then?  Does it require AQE?  If so what does a
> repartition() call do if AQE is not enabled? this is essentially a new api
> so would repartitionBySize or something be less confusing to users who
> already use repartition(num_partitions).
>
> Tom
>
> On Monday, May 24, 2021, 12:30:20 PM CDT, Wenchen Fan <cl...@gmail.com>
> wrote:
>
>
> Ideally this should be handled by the underlying data source to produce a
> reasonably partitioned RDD as the input data. However if we already have a
> poorly partitioned RDD at hand and want to repartition it properly, I think
> an extra shuffle is required so that we can know the partition size first.
>
> That said, I think calling `.repartition()` with no args is indeed a good
> solution for this problem.
>
> On Sat, May 22, 2021 at 1:12 AM mhawes <ha...@gmail.com> wrote:
>
> Adding /another/ update to say that I'm currently planning on using a
> recently introduced feature whereby calling `.repartition()` with no args
> will cause the dataset to be optimised by AQE. This actually suits our
> use-case perfectly!
>
> Example:
>
>         sparkSession.conf().set("spark.sql.adaptive.enabled", "true");
>         Dataset<Long> dataset = sparkSession.range(1, 4, 1,
> 4).repartition();
>
>         assertThat(dataset.rdd().collectPartitions().length).isEqualTo(1);
> // true
>
>
> Relevant PRs/Issues:
> [SPARK-31220][SQL] repartition obeys initialPartitionNum when
> adaptiveExecutionEnabled https://github.com/apache/spark/pull/27986
> [SPARK-32056][SQL
> <https://github.com/apache/spark/pull/27986%5BSPARK-32056%5D%5BSQL>]
> Coalesce partitions for repartition by expressions when
> AQE is enabled https://github.com/apache/spark/pull/28900
> [SPARK-32056][SQL][Follow-up] Coalesce partitions for repartiotion hint and
> sql when AQE is enabled https://github.com/apache/spark/pull/28952
>
>
>
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>
>

Re: [Spark Core]: Adding support for size based partition coalescing

Posted by Tom Graves <tg...@yahoo.com.INVALID>.
 so repartition() would look at some other config (spark.sql.adaptive.advisoryPartitionSizeInBytes) to decide the size to use to partition it on then?  Does it require AQE?  If so what does a repartition() call do if AQE is not enabled? this is essentially a new api so would repartitionBySize or something be less confusing to users who already use repartition(num_partitions).
Tom
    On Monday, May 24, 2021, 12:30:20 PM CDT, Wenchen Fan <cl...@gmail.com> wrote:  
 
 Ideally this should be handled by the underlying data source to produce a reasonably partitioned RDD as the input data. However if we already have a poorly partitioned RDD at hand and want to repartition it properly, I think an extra shuffle is required so that we can know the partition size first.
That said, I think calling `.repartition()` with no args is indeed a good solution for this problem.
On Sat, May 22, 2021 at 1:12 AM mhawes <ha...@gmail.com> wrote:

Adding /another/ update to say that I'm currently planning on using a
recently introduced feature whereby calling `.repartition()` with no args
will cause the dataset to be optimised by AQE. This actually suits our
use-case perfectly!

Example:

        sparkSession.conf().set("spark.sql.adaptive.enabled", "true");
        Dataset<Long> dataset = sparkSession.range(1, 4, 1,
4).repartition();

        assertThat(dataset.rdd().collectPartitions().length).isEqualTo(1);
// true


Relevant PRs/Issues:
[SPARK-31220][SQL] repartition obeys initialPartitionNum when
adaptiveExecutionEnabled https://github.com/apache/spark/pull/27986
[SPARK-32056][SQL] Coalesce partitions for repartition by expressions when
AQE is enabled https://github.com/apache/spark/pull/28900
[SPARK-32056][SQL][Follow-up] Coalesce partitions for repartiotion hint and
sql when AQE is enabled https://github.com/apache/spark/pull/28952



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


  

Re: [Spark Core]: Adding support for size based partition coalescing

Posted by Wenchen Fan <cl...@gmail.com>.
Ideally this should be handled by the underlying data source to produce a
reasonably partitioned RDD as the input data. However if we already have a
poorly partitioned RDD at hand and want to repartition it properly, I think
an extra shuffle is required so that we can know the partition size first.

That said, I think calling `.repartition()` with no args is indeed a good
solution for this problem.

On Sat, May 22, 2021 at 1:12 AM mhawes <ha...@gmail.com> wrote:

> Adding /another/ update to say that I'm currently planning on using a
> recently introduced feature whereby calling `.repartition()` with no args
> will cause the dataset to be optimised by AQE. This actually suits our
> use-case perfectly!
>
> Example:
>
>         sparkSession.conf().set("spark.sql.adaptive.enabled", "true");
>         Dataset<Long> dataset = sparkSession.range(1, 4, 1,
> 4).repartition();
>
>         assertThat(dataset.rdd().collectPartitions().length).isEqualTo(1);
> // true
>
>
> Relevant PRs/Issues:
> [SPARK-31220][SQL] repartition obeys initialPartitionNum when
> adaptiveExecutionEnabled https://github.com/apache/spark/pull/27986
> [SPARK-32056][SQL
> <https://github.com/apache/spark/pull/27986%5BSPARK-32056%5D%5BSQL>]
> Coalesce partitions for repartition by expressions when
> AQE is enabled https://github.com/apache/spark/pull/28900
> [SPARK-32056][SQL][Follow-up] Coalesce partitions for repartiotion hint and
> sql when AQE is enabled https://github.com/apache/spark/pull/28952
>
>
>
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>
>

Re: [Spark Core]: Adding support for size based partition coalescing

Posted by mhawes <ha...@gmail.com>.
Adding /another/ update to say that I'm currently planning on using a
recently introduced feature whereby calling `.repartition()` with no args
will cause the dataset to be optimised by AQE. This actually suits our
use-case perfectly!

Example:

        sparkSession.conf().set("spark.sql.adaptive.enabled", "true");
        Dataset<Long> dataset = sparkSession.range(1, 4, 1,
4).repartition();

        assertThat(dataset.rdd().collectPartitions().length).isEqualTo(1);
// true


Relevant PRs/Issues:
[SPARK-31220][SQL] repartition obeys initialPartitionNum when
adaptiveExecutionEnabled https://github.com/apache/spark/pull/27986
[SPARK-32056][SQL] Coalesce partitions for repartition by expressions when
AQE is enabled https://github.com/apache/spark/pull/28900
[SPARK-32056][SQL][Follow-up] Coalesce partitions for repartiotion hint and
sql when AQE is enabled https://github.com/apache/spark/pull/28952



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org