You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dongjoon Hyun (Jira)" <ji...@apache.org> on 2020/03/16 22:52:07 UTC
[jira] [Updated] (SPARK-24528) Missing optimization for
Aggregations/Windowing on a bucketed table
[ https://issues.apache.org/jira/browse/SPARK-24528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dongjoon Hyun updated SPARK-24528:
----------------------------------
Affects Version/s: (was: 3.0.0)
3.1.0
> Missing optimization for Aggregations/Windowing on a bucketed table
> -------------------------------------------------------------------
>
> Key: SPARK-24528
> URL: https://issues.apache.org/jira/browse/SPARK-24528
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 3.1.0
> Reporter: Ohad Raviv
> Priority: Major
>
> https://issues.apache.org/jira/browse/SPARK-24528#Closely related to SPARK-24410, we're trying to optimize a very common use case we have of getting the most updated row by id from a fact table.
> We're saving the table bucketed to skip the shuffle stage, but we're still "waste" time on the Sort operator evethough the data is already sorted.
> here's a good example:
> {code:java}
> sparkSession.range(N).selectExpr(
> "id as key",
> "id % 2 as t1",
> "id % 3 as t2")
> .repartition(col("key"))
> .write
> .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("key", "t1")
> .saveAsTable("a1"){code}
> {code:java}
> sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain
> == Physical Plan ==
> SortAggregate(key=[key#24L], functions=[max(named_struct(t1, t1#25L, key, key#24L, t1, t1#25L, t2, t2#26L))])
> +- SortAggregate(key=[key#24L], functions=[partial_max(named_struct(t1, t1#25L, key, key#24L, t1, t1#25L, t2, t2#26L))])
> +- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: true, Format: Parquet, Location: ...{code}
>
> and here's a bad example, but more realistic:
> {code:java}
> sparkSession.sql("set spark.sql.shuffle.partitions=2")
> sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain
> == Physical Plan ==
> SortAggregate(key=[key#32L], functions=[max(named_struct(t1, t1#33L, key, key#32L, t1, t1#33L, t2, t2#34L))])
> +- SortAggregate(key=[key#32L], functions=[partial_max(named_struct(t1, t1#33L, key, key#32L, t1, t1#33L, t2, t2#34L))])
> +- *(1) Sort [key#32L ASC NULLS FIRST], false, 0
> +- *(1) FileScan parquet default.a1[key#32L,t1#33L,t2#34L] Batched: true, Format: Parquet, Location: ...
> {code}
>
> I've traced the problem to DataSourceScanExec#235:
> {code:java}
> val sortOrder = if (sortColumns.nonEmpty) {
> // In case of bucketing, its possible to have multiple files belonging to the
> // same bucket in a given relation. Each of these files are locally sorted
> // but those files combined together are not globally sorted. Given that,
> // the RDD partition will not be sorted even if the relation has sort columns set
> // Current solution is to check if all the buckets have a single file in it
> val files = selectedPartitions.flatMap(partition => partition.files)
> val bucketToFilesGrouping =
> files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file))
> val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1){code}
> so obviously the code avoids dealing with this situation now..
> could you think of a way to solve this or bypass it?
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org