You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (Jira)" <ji...@apache.org> on 2020/06/23 02:57:00 UTC

[jira] [Assigned] (SPARK-32056) Repartition by key should support partition coalesce for AQE

     [ https://issues.apache.org/jira/browse/SPARK-32056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Apache Spark reassigned SPARK-32056:
------------------------------------

    Assignee:     (was: Apache Spark)

> Repartition by key should support partition coalesce for AQE
> ------------------------------------------------------------
>
>                 Key: SPARK-32056
>                 URL: https://issues.apache.org/jira/browse/SPARK-32056
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.0.0
>         Environment: spark release 3.0.0
>            Reporter: koert kuipers
>            Priority: Minor
>
> when adaptive query execution is enabled the following expression should support coalescing of partitions:
> {code:java}
> dataframe.repartition(col("somecolumn")) {code}
> currently it does not because it simply calls the repartition implementation where number of partitions is specified:
> {code:java}
>   def repartition(partitionExprs: Column*): Dataset[T] = {
>     repartition(sparkSession.sessionState.conf.numShufflePartitions, partitionExprs: _*)
>   }{code}
> and repartition with the number of partitions specified does now allow for coalescing of partitions (since this breaks the user's expectation that it will have the number of partitions specified).
> for more context see the discussion here:
> [https://github.com/apache/spark/pull/27986]
> a simple test to confirm that repartition by key does not support coalescing of partitions can be added in AdaptiveQueryExecSuite like this (it currently fails):
> {code:java}
>   test("SPARK-32056 repartition has less partitions for small data when adaptiveExecutionEnabled") {
>     Seq(true, false).foreach { enableAQE =>
>       withSQLConf(
>         SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAQE.toString,
>         SQLConf.SHUFFLE_PARTITIONS.key -> "50",
>         SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "50",
>         SQLConf.SHUFFLE_PARTITIONS.key -> "50") {
>         val partitionsNum = (1 to 10).toDF.repartition($"value")
>           .rdd.collectPartitions().length
>         if (enableAQE) {
>           assert(partitionsNum < 50)
>         } else {
>           assert(partitionsNum === 50)
>         }
>       }
>     }
>   }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org