You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/11 19:22:38 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #4170: Add ability to specify external sort information for ListingTables

alamb commented on code in PR #4170:
URL: https://github.com/apache/arrow-datafusion/pull/4170#discussion_r1020503156


##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -220,6 +225,16 @@ pub struct ListingOptions {
     /// Group files to avoid that the number of partitions exceeds
     /// this limit
     pub target_partitions: usize,
+    /// Optional pre-known sort order. Must be `SortExpr`s.
+    ///
+    /// DataFusion may take advantage of this ordering to omit sorts
+    /// or use more efficient algorithms. Currently sortedness must be
+    /// provided if it is known by some external mechanism, but may in
+    /// the future be automatically determined, for example using
+    /// parquet metadata.
+    ///
+    /// See <https://github.com/apache/arrow-datafusion/issues/4177>
+    pub file_sort_order: Option<Vec<Expr>>,

Review Comment:
   As I was working on this feature, it occurs to me that if the sort order is to be maintained, the ListingTable can't put multiple file into the same partition πŸ€”  
   
   
   The ListingTable does not attempt to read all files concurrently instead it will read files in sequence within a partition.  This is an important property as it allows plans to run against 1000s of files and not try to open them all concurrently.
   
   However, it means if we assign more than one file to a partitition the output sort order will not be preserved as illustrated in this diagrams
   
   When only 1 file is assigned to each partition, each partition is correctly sorted on `(A, B, C)`
   
   ```text
   ┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ β”“
     β”Œ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ β”Œ ─ ─ ─ ─ ─ ─ ─ ─ ─  β”Œ ─ ─ ─ ─ ─ ─ ─ ─ ─  β”Œ ─ ─ ─ ─ ─ ─ ─ ─ ┐
   ┃   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   ┃
     β”‚ β”‚   1.parquet   β”‚ β”‚ β”‚ β”‚  2.parquet   β”‚   β”‚ β”‚  3.parquet   β”‚   β”‚ β”‚  4.parquet  β”‚ β”‚
   ┃   β”‚ Sort: A, B, C β”‚     β”‚Sort: A, B, C β”‚ β”‚   β”‚Sort: A, B, C β”‚ β”‚   β”‚Sort: A, B, Cβ”‚   ┃
     β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
   ┃                                          β”‚                    β”‚                     ┃
     β”‚                   β”‚ β”‚                    β”‚                    β”‚                 β”‚
   ┃                                          β”‚                    β”‚                     ┃
     β”‚                   β”‚ β”‚                    β”‚                    β”‚                 β”‚
   ┃                                          β”‚                    β”‚                     ┃
     β”‚                   β”‚ β”‚                    β”‚                    β”‚                 β”‚
   ┃  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ─ ─ ─ ─ ─ ─ ─ ─ ─ β”˜  ─ ─ ─ ─ ─ ─ ─ ─ ─ β”˜  ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃
        DataFusion           DataFusion           DataFusion           DataFusion
   ┃    Partition 1          Partition 2          Partition 3          Partition 4       ┃
    ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━
   
                                         ParquetExec
   ```
   
   
   When more than 1 file is assigned to each partition, each partition is NOT correctly sorted on `(A, B, C)`. Once the second file is scanned, the same values for A, B and C can be repeated in the same sorted stream
   
   ```text
   ┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━
     β”Œ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ β”Œ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃
   ┃   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
     β”‚ β”‚   1.parquet   β”‚ β”‚ β”‚ β”‚  2.parquet   β”‚   ┃
   ┃   β”‚ Sort: A, B, C β”‚     β”‚Sort: A, B, C β”‚ β”‚
     β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   ┃
   ┃   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
     β”‚ β”‚   3.parquet   β”‚ β”‚ β”‚ β”‚  4.parquet   β”‚   ┃
   ┃   β”‚ Sort: A, B, C β”‚     β”‚Sort: A, B, C β”‚ β”‚
     β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   ┃
   ┃                                          β”‚
     β”‚                   β”‚ β”‚                    ┃
   ┃  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ─ ─ ─ ─ ─ ─ ─ ─ ─ β”˜
        DataFusion           DataFusion         ┃
   ┃    Partition 1          Partition 1
    ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ β”›
   
                 ParquetExec
   ```
   
   
   Does anyone have a preference between any of the possibilities:
   
   1. Adding a check in DataFusion so that sorts will be ignored if there are more than one file in each partition? 
   2. Making more partitions if a sort order is specified
   3. Making more partitions up to a point (maybe 2x the target partitions and if not ignore the sort order)?
   
   This affects IOx, see https://github.com/influxdata/influxdb_iox/issues/6125



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org