You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Zhe Dong (Jira)" <ji...@apache.org> on 2022/12/07 08:30:00 UTC

[jira] [Comment Edited] (SPARK-41386) There are some small files when using rebalance(column)

    [ https://issues.apache.org/jira/browse/SPARK-41386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17644141#comment-17644141 ] 

Zhe Dong edited comment on SPARK-41386 at 12/7/22 8:29 AM:
-----------------------------------------------------------

OptimizeSkewInRebalancePartitions.scala
{noformat}
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */package org.apache.spark.sql.execution.adaptiveimport org.apache.spark.sql.execution.{CoalescedPartitionSpec, ShufflePartitionSpec, SparkPlan}
import org.apache.spark.sql.execution.exchange.{REBALANCE_PARTITIONS_BY_COL, REBALANCE_PARTITIONS_BY_NONE, ShuffleOrigin}
import org.apache.spark.sql.internal.SQLConf/**
 * A rule to optimize the skewed shuffle partitions in [[RebalancePartitions]] based on the map
 * output statistics, which can avoid data skew that hurt performance.
 *
 * We use ADVISORY_PARTITION_SIZE_IN_BYTES size to decide if a partition should be optimized.
 * Let's say we have 3 maps with 3 shuffle partitions, and assuming r1 has data skew issue.
 * the map side looks like:
 *   m0:[b0, b1, b2], m1:[b0, b1, b2], m2:[b0, b1, b2]
 * and the reduce side looks like:
 *                            (without this rule) r1[m0-b1, m1-b1, m2-b1]
 *                              /                                     \
 *   r0:[m0-b0, m1-b0, m2-b0], r1-0:[m0-b1], r1-1:[m1-b1], r1-2:[m2-b1], r2[m0-b2, m1-b2, m2-b2]
 */
object OptimizeSkewInRebalancePartitions extends AQEShuffleReadRule {  override val supportedShuffleOrigins: Seq[ShuffleOrigin] =
    Seq(REBALANCE_PARTITIONS_BY_NONE, REBALANCE_PARTITIONS_BY_COL)  /**
   * Splits the skewed partition based on the map size and the target partition size
   * after split. Create a list of `PartialReducerPartitionSpec` for skewed partition and
   * create `CoalescedPartition` for normal partition.
   */
  private def optimizeSkewedPartitions(
      shuffleId: Int,
      bytesByPartitionId: Array[Long],
      targetSize: Long,
      smallPartitionFactor: Double): Seq[ShufflePartitionSpec] = {
    bytesByPartitionId.indices.flatMap { reduceIndex =>
      val bytes = bytesByPartitionId(reduceIndex)
      if (bytes > targetSize) {
        val newPartitionSpec = ShufflePartitionsUtil.createSkewPartitionSpecs(
          shuffleId, reduceIndex, targetSize, smallPartitionFactor)
        if (newPartitionSpec.isEmpty) {
          CoalescedPartitionSpec(reduceIndex, reduceIndex + 1, bytes) :: Nil
        } else {
          logDebug(s"For shuffle $shuffleId, partition $reduceIndex is skew, " +
            s"split it into ${newPartitionSpec.get.size} parts.")
          newPartitionSpec.get
        }
      } else if (bytes < targetSize * smallPartitionFactor) {
        CoalescedPartitionSpec(reduceIndex, reduceIndex + 1, bytes) :: Nil
      } else {
        CoalescedPartitionSpec(reduceIndex, reduceIndex, bytes) :: Nil
      }
    }
  }  private def tryOptimizeSkewedPartitions(shuffle: ShuffleQueryStageExec): SparkPlan = {
    val advisorySize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
    val smallPartitionFactor =
      conf.getConf(SQLConf.ADAPTIVE_REBALANCE_PARTITIONS_SMALL_PARTITION_FACTOR)
    val mapStats = shuffle.mapStats
    if (mapStats.isEmpty ||
      mapStats.get.bytesByPartitionId.forall(
        r => r <= advisorySize && r >= advisorySize * smallPartitionFactor)) {
      return shuffle
    }    val newPartitionsSpec = optimizeSkewedPartitions(
      mapStats.get.shuffleId, mapStats.get.bytesByPartitionId, advisorySize, smallPartitionFactor)
    // return origin plan if we can not optimize partitions
    if (newPartitionsSpec.length == mapStats.get.bytesByPartitionId.length) {
      shuffle
    } else {
      AQEShuffleReadExec(shuffle, newPartitionsSpec)
    }
  }  override def apply(plan: SparkPlan): SparkPlan = {
    if (!conf.getConf(SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED)) {
      return plan
    }    plan transformUp {
      case stage: ShuffleQueryStageExec if isSupported(stage.shuffle) =>
        tryOptimizeSkewedPartitions(stage)
    }
  }
}
{noformat}
 

 


was (Author: JIRAUSER298432):
 
{noformat}
    if (mapStats.isEmpty ||
      mapStats.get.bytesByPartitionId.forall(_ <= advisorySize && _ >= advisorySize * smallPartitionFactor )) {
      return shuffle
    }

--------------------------------------------------------------------------------
      if (bytes > targetSize) {
            ... 
      } else if ( bytes < targetSize * smallPartitionFactor ){
           CoalescedPartitionSpec(reduceIndex, reduceIndex + 1, bytes) :: Nil
      }else {        
           return shuffle // dummy
       }{noformat}
 

 

> There are some small files when using rebalance(column)
> -------------------------------------------------------
>
>                 Key: SPARK-41386
>                 URL: https://issues.apache.org/jira/browse/SPARK-41386
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.3.1
>            Reporter: Zhe Dong
>            Priority: Minor
>
> *Problem ( REBALANCE(column)* {*}){*}:
>  SparkSession config:
> {noformat}
> config("spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled", "true") 
> config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "20m") 
> config("spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor", "0.5"){noformat}
> so, we except that files size should be bigger than 20m*0.5=10m at least. 
> but in fact , we got some small files like the following:
> {noformat}
> -rw-r--r--   1 jp28948 staff     12.1 M 2022-12-07 13:13 .../part-00000-1ece1aae-f4f6-47ac-abe2-170ccb61f60e.c000.snappy.parquet
> -rw-r--r--   1 jp28948 staff     12.1 M 2022-12-07 13:13 .../part-00001-1ece1aae-f4f6-47ac-abe2-170ccb61f60e.c000.snappy.parquet
> -rw-r--r--   1 jp28948 staff     12.1 M 2022-12-07 13:13 .../part-00002-1ece1aae-f4f6-47ac-abe2-170ccb61f60e.c000.snappy.parquet
> -rw-r--r--   1 jp28948 staff     12.1 M 2022-12-07 13:13 .../part-00003-1ece1aae-f4f6-47ac-abe2-170ccb61f60e.c000.snappy.parquet
> -rw-r--r--   1 jp28948 staff      9.1 M 2022-12-07 13:13 .../part-00004-1ece1aae-f4f6-47ac-abe2-170ccb61f60e.c000.snappy.parquet
> -rw-r--r--   1 jp28948 staff      3.0 M 2022-12-07 13:13 .../part-00005-1ece1aae-f4f6-47ac-abe2-170ccb61f60e.c000.snappy.parquet{noformat}
> 9.1 M and 3.0 M is smaller than 10M. we have to handle these small files in another way.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org