You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/08/09 09:59:17 UTC

[GitHub] [spark] wangyum opened a new pull request, #37451: [SPARK-40017][SQL] Adaptive adjustment `spark.sql.adaptive.advisoryPartitionSizeInBytes`

wangyum opened a new pull request, #37451:
URL: https://github.com/apache/spark/pull/37451

   ### What changes were proposed in this pull request?
   
   This PR enhances `CoalesceShufflePartitions` to adaptive adjustment `spark.sql.adaptive.advisoryPartitionSizeInBytes` according to the `ExpandExec` operator that follows. For example:
   ```sql
   SELECT                                          
     item.i_brand_id brand_id,                     
     item.i_brand brand,                           
     count(distinct ss_ext_sales_price),          
     count(distinct ss_addr_sk)                   
   FROM store_sales, item                          
   WHERE store_sales.ss_item_sk = item.i_item_sk   
     AND item.i_manufact_id = 128                  
   GROUP BY item.i_brand, item.i_brand_id          
   ```
   
   ### Why are the changes needed?
   
   Improve parallelism.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   Unit test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wangyum commented on pull request #37451: [SPARK-40017][SQL] Adaptive adjustment `spark.sql.adaptive.advisoryPartitionSizeInBytes`

Posted by GitBox <gi...@apache.org>.
wangyum commented on PR #37451:
URL: https://github.com/apache/spark/pull/37451#issuecomment-1209352201

   cc @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] github-actions[bot] commented on pull request #37451: [SPARK-40017][SQL] Adaptive adjustment `spark.sql.adaptive.advisoryPartitionSizeInBytes`

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #37451:
URL: https://github.com/apache/spark/pull/37451#issuecomment-1328139492

   We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #37451: [SPARK-40017][SQL] Adaptive adjustment `spark.sql.adaptive.advisoryPartitionSizeInBytes`

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #37451:
URL: https://github.com/apache/spark/pull/37451#discussion_r948649576


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala:
##########
@@ -163,6 +163,16 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
       }.getOrElse(plan)
     case other => other.mapChildren(updateShuffleReads(_, specsMap))
   }
+
+  // Adjust advisory partition size in bytes based on following operators.

Review Comment:
   I agree that this rule is a bit too simple, as it assumes the final data size is the sum of all leaf shuffle data sizes. This is not true in many cases, e.g. with Join, Expand, Aggregate.
   
   I think we should fix this assumption instead of adjusting the advisory size. We should transform the query plan and understand the lineage between leaf shuffles and the final result. The goal is to make the result partition size 64 mb, and we need an algorithm to determine the target shuffle partition size for each leaf shuffle node, considering several special nodes in the query plan (join, expand, aggregate, etc.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wangyum commented on a diff in pull request #37451: [SPARK-40017][SQL] Adaptive adjustment `spark.sql.adaptive.advisoryPartitionSizeInBytes`

Posted by GitBox <gi...@apache.org>.
wangyum commented on code in PR #37451:
URL: https://github.com/apache/spark/pull/37451#discussion_r947399839


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala:
##########
@@ -163,6 +163,16 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
       }.getOrElse(plan)
     case other => other.mapChildren(updateShuffleReads(_, specsMap))
   }
+
+  // Adjust advisory partition size in bytes based on following operators.
+  private def adjustAdvisoryPartitionSizeInBytes(plan: SparkPlan, conf: SQLConf): Long = {
+    val postShuffleInputSize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
+    val factor = 1.0 + plan.collect {
+      case e: ExpandExec => 0.5 * e.projections.size
+      case _ => 0

Review Comment:
   In our production environment, we support more operators:
   ```scala
   case _: BroadcastNestedLoopJoinExec => 4
   case SortMergeJoinExec(_, _, _, Some(condition), _, _, _) if containsInequalityPredicate(condition) => 3
   case ObjectHashAggregateExec(_, _, aggregateExpr, _, _, _, _)
     if aggregateExpr.exists {_.aggregateFunction match {
       case _: Percentile => true
       case _: ApproximatePercentile => true
       case _ => false
     }} => 2
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maryannxue commented on pull request #37451: [SPARK-40017][SQL] Adaptive adjustment `spark.sql.adaptive.advisoryPartitionSizeInBytes`

Posted by GitBox <gi...@apache.org>.
maryannxue commented on PR #37451:
URL: https://github.com/apache/spark/pull/37451#issuecomment-1218260537

   I'm afraid this is more of an ad-hoc change specific to one type of query or scenario. The heuristics might now work well with other cases.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #37451: [SPARK-40017][SQL] Adaptive adjustment `spark.sql.adaptive.advisoryPartitionSizeInBytes`

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on PR #37451:
URL: https://github.com/apache/spark/pull/37451#issuecomment-1210013200

   @maryannxue FYI


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] github-actions[bot] closed pull request #37451: [SPARK-40017][SQL] Adaptive adjustment `spark.sql.adaptive.advisoryPartitionSizeInBytes`

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #37451: [SPARK-40017][SQL] Adaptive adjustment `spark.sql.adaptive.advisoryPartitionSizeInBytes`
URL: https://github.com/apache/spark/pull/37451


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org