You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:20:15 UTC

[jira] [Updated] (SPARK-17497) Preserve order when scanning ordered buckets over multiple partitions

     [ https://issues.apache.org/jira/browse/SPARK-17497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon updated SPARK-17497:
---------------------------------
    Labels: bulk-closed  (was: )

> Preserve order when scanning ordered buckets over multiple partitions
> ---------------------------------------------------------------------
>
>                 Key: SPARK-17497
>                 URL: https://issues.apache.org/jira/browse/SPARK-17497
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>            Reporter: Fridtjof Sander
>            Priority: Minor
>              Labels: bulk-closed
>
> Non-associative aggregations (like `collect_list`) require the data to be sorted on the grouping key in order to extract aggregation-groups.
> Let `table` be a Hive-table, that is partitioned on `p` and bucketed and sorted on `id`. Let `q` be a query, that executes a non-associative aggregation on `table.id` over multiple partitions `p`.
> Currently, when executing `q`, Spark creates as many RDD-partitions as there are buckets. Each RDD-partition is created in `FileScanRDD`, by fetching the associated buckets in all requested Hive-partitions. Because the buckets are read one-by-one, the resulting RDD-partition is no longer sorted on `id` and has to be explicitly sorted before performing the aggregation. Therefore an execution-pipeline-block is introduced.
> In this Jira I propose to offer an alternative bucket-fetching strategy to the optimizer, that preserves the internal sorting in a situation described above.
> One way to achieve this, is to open all buckets over all partitions simultaneously when fetching the data. Since each bucket is internally sorted, we can perform basically a merge-sort on the collection of bucket-iterators, and directly emit a sorted RDD-partition, that can be piped into the next operator.
> While there should be no question about the theoretical feasibility of this idea, there are some obvious implications i.e. with regards to IO-handling.
> I would like to investigate the practical feasibility, limits, gains and drawbacks of this optimization in my masters-thesis and, of course, contribute the implementation. Before I start, however, I wanted to kindly ask you, the community, for any thoughts, opinions, corrections or other kinds of feedback, which is much appreciated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org