You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/08/09 09:59:17 UTC
[GitHub] [spark] wangyum opened a new pull request, #37451: [SPARK-40017][SQL] Adaptive adjustment `spark.sql.adaptive.advisoryPartitionSizeInBytes`
wangyum opened a new pull request, #37451:
URL: https://github.com/apache/spark/pull/37451
### What changes were proposed in this pull request?
This PR enhances `CoalesceShufflePartitions` to adaptive adjustment `spark.sql.adaptive.advisoryPartitionSizeInBytes` according to the `ExpandExec` operator that follows. For example:
```sql
SELECT
item.i_brand_id brand_id,
item.i_brand brand,
count(distinct ss_ext_sales_price),
count(distinct ss_addr_sk)
FROM store_sales, item
WHERE store_sales.ss_item_sk = item.i_item_sk
AND item.i_manufact_id = 128
GROUP BY item.i_brand, item.i_brand_id
```
### Why are the changes needed?
Improve parallelism.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit test.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] wangyum commented on pull request #37451: [SPARK-40017][SQL] Adaptive adjustment `spark.sql.adaptive.advisoryPartitionSizeInBytes`
Posted by GitBox <gi...@apache.org>.
wangyum commented on PR #37451:
URL: https://github.com/apache/spark/pull/37451#issuecomment-1209352201
cc @cloud-fan
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] github-actions[bot] commented on pull request #37451: [SPARK-40017][SQL] Adaptive adjustment `spark.sql.adaptive.advisoryPartitionSizeInBytes`
Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #37451:
URL: https://github.com/apache/spark/pull/37451#issuecomment-1328139492
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #37451: [SPARK-40017][SQL] Adaptive adjustment `spark.sql.adaptive.advisoryPartitionSizeInBytes`
Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #37451:
URL: https://github.com/apache/spark/pull/37451#discussion_r948649576
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala:
##########
@@ -163,6 +163,16 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
}.getOrElse(plan)
case other => other.mapChildren(updateShuffleReads(_, specsMap))
}
+
+ // Adjust advisory partition size in bytes based on following operators.
Review Comment:
I agree that this rule is a bit too simple, as it assumes the final data size is the sum of all leaf shuffle data sizes. This is not true in many cases, e.g. with Join, Expand, Aggregate.
I think we should fix this assumption instead of adjusting the advisory size. We should transform the query plan and understand the lineage between leaf shuffles and the final result. The goal is to make the result partition size 64 mb, and we need an algorithm to determine the target shuffle partition size for each leaf shuffle node, considering several special nodes in the query plan (join, expand, aggregate, etc.)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] wangyum commented on a diff in pull request #37451: [SPARK-40017][SQL] Adaptive adjustment `spark.sql.adaptive.advisoryPartitionSizeInBytes`
Posted by GitBox <gi...@apache.org>.
wangyum commented on code in PR #37451:
URL: https://github.com/apache/spark/pull/37451#discussion_r947399839
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala:
##########
@@ -163,6 +163,16 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
}.getOrElse(plan)
case other => other.mapChildren(updateShuffleReads(_, specsMap))
}
+
+ // Adjust advisory partition size in bytes based on following operators.
+ private def adjustAdvisoryPartitionSizeInBytes(plan: SparkPlan, conf: SQLConf): Long = {
+ val postShuffleInputSize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
+ val factor = 1.0 + plan.collect {
+ case e: ExpandExec => 0.5 * e.projections.size
+ case _ => 0
Review Comment:
In our production environment, we support more operators:
```scala
case _: BroadcastNestedLoopJoinExec => 4
case SortMergeJoinExec(_, _, _, Some(condition), _, _, _) if containsInequalityPredicate(condition) => 3
case ObjectHashAggregateExec(_, _, aggregateExpr, _, _, _, _)
if aggregateExpr.exists {_.aggregateFunction match {
case _: Percentile => true
case _: ApproximatePercentile => true
case _ => false
}} => 2
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] maryannxue commented on pull request #37451: [SPARK-40017][SQL] Adaptive adjustment `spark.sql.adaptive.advisoryPartitionSizeInBytes`
Posted by GitBox <gi...@apache.org>.
maryannxue commented on PR #37451:
URL: https://github.com/apache/spark/pull/37451#issuecomment-1218260537
I'm afraid this is more of an ad-hoc change specific to one type of query or scenario. The heuristics might now work well with other cases.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #37451: [SPARK-40017][SQL] Adaptive adjustment `spark.sql.adaptive.advisoryPartitionSizeInBytes`
Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on PR #37451:
URL: https://github.com/apache/spark/pull/37451#issuecomment-1210013200
@maryannxue FYI
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] github-actions[bot] closed pull request #37451: [SPARK-40017][SQL] Adaptive adjustment `spark.sql.adaptive.advisoryPartitionSizeInBytes`
Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #37451: [SPARK-40017][SQL] Adaptive adjustment `spark.sql.adaptive.advisoryPartitionSizeInBytes`
URL: https://github.com/apache/spark/pull/37451
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org