You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "koert kuipers (Jira)" <ji...@apache.org> on 2020/06/22 19:38:00 UTC

[jira] [Created] (SPARK-32056) Repartition by key should support partition coalesce for AQE

koert kuipers created SPARK-32056:
-------------------------------------

             Summary: Repartition by key should support partition coalesce for AQE
                 Key: SPARK-32056
                 URL: https://issues.apache.org/jira/browse/SPARK-32056
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.0.0
         Environment: spark release 3.0.0
            Reporter: koert kuipers


when adaptive query execution is enabled the following expression should support coalescing of partitions:
{code:java}
dataframe.repartition(col("somecolumn")) {code}
currently it does not because it simply calls the repartition implementation where number of partitions is specified:
{code:java}
  def repartition(partitionExprs: Column*): Dataset[T] = {
    repartition(sparkSession.sessionState.conf.numShufflePartitions, partitionExprs: _*)
  }{code}
and repartition with the number of partitions specified does now allow for coalescing of partitions (since this breaks the user's expectation that it will have the number of partitions specified).

for more context see the discussion here:

[https://github.com/apache/spark/pull/27986]

a simple test to confirm that repartition by key does not support coalescing of partitions can be added in AdaptiveQueryExecSuite like this (it currently fails):
{code:java}
  test("SPARK-????? repartition has less partitions for small data when adaptiveExecutionEnabled") {
    Seq(true, false).foreach { enableAQE =>
      withSQLConf(
        SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAQE.toString,
        SQLConf.SHUFFLE_PARTITIONS.key -> "50",
        SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "50",
        SQLConf.SHUFFLE_PARTITIONS.key -> "50") {
        val partitionsNum = (1 to 10).toDF.repartition($"value")
          .rdd.collectPartitions().length
        if (enableAQE) {
          assert(partitionsNum < 50)
        } else {
          assert(partitionsNum === 50)
        }
      }
    }
  }
{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org