You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "devinjdangelo (via GitHub)" <gi...@apache.org> on 2023/10/05 01:51:21 UTC

[PR] Support InsertInto Sorted ListingTable [arrow-datafusion]

devinjdangelo opened a new pull request, #7743:
URL: https://github.com/apache/arrow-datafusion/pull/7743

   ## Which issue does this PR close?
   
   Closes #7354 
   
   ## Rationale for this change
   
   See issue
   
   ## What changes are included in this PR?
   
   Allows specifying a `required_input_ordering` on a `FileSinkExec` which forces the output to sorted in a particular way. The setting is optional, so `Copy To` can keep its existing behavior while `ListingTable` can optionally inject its required sort order.
   
   ## Are these changes tested?
   
   Yes, via a new sqllogic test
   
   ## Are there any user-facing changes?
   
   Inserting to a table with required order works and maintains required file ordering


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Support InsertInto Sorted ListingTable [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #7743:
URL: https://github.com/apache/arrow-datafusion/pull/7743#discussion_r1349298428


##########
datafusion/physical-plan/src/insert.rs:
##########
@@ -73,6 +73,8 @@ pub struct FileSinkExec {
     sink_schema: SchemaRef,
     /// Schema describing the structure of the output data.
     count_schema: SchemaRef,
+    /// Optional required sort order for output data.
+    sort_order: Option<Vec<Option<Vec<PhysicalSortRequirement>>>>,

Review Comment:
   Since FileSink can have only a single input, I think it only needs a single sort order per [`required_input_order`](https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#method.required_input_ordering)
   
   In other words, I think this could be simplified to 
   
   ```suggestion
       sort_order: Option<Vec<PhysicalSortRequirement>>>,
   ```
   
   And then adjust required_input_order appropriately
   



##########
datafusion/sqllogictest/test_files/insert_to_external.slt:
##########
@@ -45,6 +45,35 @@ LOCATION '../../testing/data/csv/aggregate_test_100.csv'
 statement ok
 set datafusion.execution.target_partitions = 8;
 
+statement ok
+CREATE EXTERNAL TABLE
+ordered_insert_test(a bigint, b bigint)
+STORED AS csv
+LOCATION 'test_files/scratch/insert_to_external/insert_to_ordered/'
+WITH ORDER (a ASC, B DESC)
+OPTIONS(
+create_local_path 'true',
+insert_mode 'append_new_files',
+);
+
+query II
+INSERT INTO ordered_insert_test values (5, 1), (4, 2), (7,7), (7,8), (7,9), (7,10), (3, 3), (2, 4), (1, 5);
+----
+9

Review Comment:
   Can you please also add an `EXPLAIN INSERT INTO ordered_insert_test values (5, 1), (4, 2)` to verify that the plan has a `SortExec in it?



##########
datafusion/core/src/physical_planner.rs:
##########
@@ -604,7 +604,7 @@ impl DefaultPhysicalPlanner {
                         FileType::ARROW => Arc::new(ArrowFormat {}),
                     };
 
-                    sink_format.create_writer_physical_plan(input_exec, session_state, config).await
+                    sink_format.create_writer_physical_plan(input_exec, session_state, config, None).await

Review Comment:
   👍 



##########
datafusion/core/src/physical_planner.rs:
##########
@@ -604,7 +604,7 @@ impl DefaultPhysicalPlanner {
                         FileType::ARROW => Arc::new(ArrowFormat {}),
                     };
 
-                    sink_format.create_writer_physical_plan(input_exec, session_state, config).await
+                    sink_format.create_writer_physical_plan(input_exec, session_state, config, None).await

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Support InsertInto Sorted ListingTable [arrow-datafusion]

Posted by "devinjdangelo (via GitHub)" <gi...@apache.org>.
devinjdangelo commented on code in PR #7743:
URL: https://github.com/apache/arrow-datafusion/pull/7743#discussion_r1349322356


##########
datafusion/physical-plan/src/insert.rs:
##########
@@ -73,6 +73,8 @@ pub struct FileSinkExec {
     sink_schema: SchemaRef,
     /// Schema describing the structure of the output data.
     count_schema: SchemaRef,
+    /// Optional required sort order for output data.
+    sort_order: Option<Vec<Option<Vec<PhysicalSortRequirement>>>>,

Review Comment:
   This makes sense. Will do!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Support InsertInto Sorted ListingTable [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #7743:
URL: https://github.com/apache/arrow-datafusion/pull/7743#issuecomment-1751391242

   > However, if I insert the same data again, now the data is not sorted!
   
   
   
   I couldn't reproduce this locally with smaller reproducer and I am out of time. I'll investigate more if I have time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Support InsertInto Sorted ListingTable [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #7743:
URL: https://github.com/apache/arrow-datafusion/pull/7743#discussion_r1349747256


##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -907,17 +907,19 @@ impl TableProvider for ListingTable {
                     "Cannot insert into a sorted ListingTable with mode append!".into(),
                 ));
             }
+            // Multiple sort orders in outer vec are equivalent, so we pass only the first one

Review Comment:
   👍 



##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -907,17 +907,19 @@ impl TableProvider for ListingTable {
                     "Cannot insert into a sorted ListingTable with mode append!".into(),
                 ));
             }
+            // Multiple sort orders in outer vec are equivalent, so we pass only the first one

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Support InsertInto Sorted ListingTable [arrow-datafusion]

Posted by "devinjdangelo (via GitHub)" <gi...@apache.org>.
devinjdangelo commented on PR #7743:
URL: https://github.com/apache/arrow-datafusion/pull/7743#issuecomment-1751409949

   > I found a few other issues, but I don't think they are caused by this PR
   > 
   > ```shell
   > $ mkdir /tmp/output
   > $ datafusion-cli
   > DataFusion CLI v31.0.0
   > ❯ create external table output(time timestamp) stored as parquet location '/tmp/output' with order (time);
   > 0 rows in set. Query took 0.003 seconds.
   > 
   > ❯ insert into output values (now()), (now() - interval '1 minute'), (now() + interval '2 minutes'), (now() - interval '3 minutes');
   > +-------+
   > | count |
   > +-------+
   > | 4     |
   > +-------+
   > 1 row in set. Query took 0.144 seconds.
   > 
   > ❯ select * from output;
   > +----------------------------+
   > | time                       |
   > +----------------------------+
   > | 2023-10-06T20:31:08.042535 |
   > | 2023-10-06T20:33:08.042535 |
   > | 2023-10-06T20:34:08.042535 |
   > | 2023-10-06T20:36:08.042535 |
   > +----------------------------+
   > 4 rows in set. Query took 0.005 seconds.
   > ```
   > 
   > However, if I insert the same data again, now the data is not sorted!
   > 
   > ```
   > ❯ insert into output values (now()), (now() - interval '1 minute'), (now() + interval '2 minutes'), (now() - interval '3 minutes');
   > +-------+
   > | count |
   > +-------+
   > | 4     |
   > +-------+
   > 1 row in set. Query took 0.178 seconds.
   > 
   > ❯ select * from output;
   > +----------------------------+
   > | time                       |
   > +----------------------------+
   > | 2023-10-06T20:32:38.514236 |
   > | 2023-10-06T20:34:38.514236 |
   > | 2023-10-06T20:35:38.514236 |
   > | 2023-10-06T20:37:38.514236 |
   > | 2023-10-06T20:31:08.042535 |
   > | 2023-10-06T20:33:08.042535 |
   > | 2023-10-06T20:34:08.042535 |
   > | 2023-10-06T20:36:08.042535 |
   > +----------------------------+
   > 8 rows in set. Query took 0.005 seconds.
   > ```
   
   I actually think the above is correct behavior. The table is not globally sorted, but rather each individual file is sorted. Each time you insert, at least one new file is inserted. In the above result we see two independently sorted chunks, which means each insert created one new sorted file.
   
   > 
   > I also found that there were a huge number of empty output files created
   > 
   > ```
   > alamb@MacBook-Pro-8:~/Software/arrow-datafusion2/datafusion-cli$ ls -ltr /tmp/output
   > total 256
   > -rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_1.parquet
   > -rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_4.parquet
   > -rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_8.parquet
   > -rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_10.parquet
   > -rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_2.parquet
   > -rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_7.parquet
   > -rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_13.parquet
   > -rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_9.parquet
   > -rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_3.parquet
   > -rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_12.parquet
   > -rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_6.parquet
   > -rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_14.parquet
   > -rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_15.parquet
   > -rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_11.parquet
   > -rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:34 1PHmXyyoDVGbi7oo_5.parquet
   > -rw-r--r--@ 1 alamb  wheel   615B Oct  6 16:34 1PHmXyyoDVGbi7oo_0.parquet
   > -rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_2.parquet
   > -rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_5.parquet
   > -rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_6.parquet
   > -rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_4.parquet
   > -rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_12.parquet
   > -rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_11.parquet
   > -rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_3.parquet
   > -rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_9.parquet
   > -rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_8.parquet
   > -rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_14.parquet
   > -rw-r--r--@ 1 alamb  wheel   615B Oct  6 16:35 FtsEcvDwXi7JVaVq_0.parquet
   > -rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_7.parquet
   > -rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_15.parquet
   > -rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_10.parquet
   > -rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_13.parquet
   > -rw-r--r--@ 1 alamb  wheel   289B Oct  6 16:35 FtsEcvDwXi7JVaVq_1.parquet
   > ```
   > 
   > But I don't think this is caused by this PR - #5383
   
   Yeah, `FileSinks` currently output 1 file for each input stream. That number is determined by the optimizer. Based on your result, I would guess that your system has 16 vcores, so you end up with 1 stream containing data and 15 empty streams. 
   
   We could solve this by making FileSinks a little more intelligent in how they partition the output. For example, they could be configured with a target file size. This idea is related to what will be required for #7744 . 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Support InsertInto Sorted ListingTable [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb merged PR #7743:
URL: https://github.com/apache/arrow-datafusion/pull/7743


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Support InsertInto Sorted ListingTable [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #7743:
URL: https://github.com/apache/arrow-datafusion/pull/7743#issuecomment-1752106345

   > > But now that I look at that plan, perhaps the issue is that there is more than one file in each group, so the sort order can't be maintained 🤔
   > 
   > Perhaps if each output file were larger we would avoid that issue. It would be nice if you could configure a desired file output size rather than relying on the plan partitioning... I opened #7767 for this.
   
   I think the issue is explained here: https://github.com/apache/arrow-datafusion/blob/3d1b23a04bdc04c526e2dcb06e0cf1995707587d/datafusion/core/src/datasource/physical_plan/mod.rs#L408-L467
   
   (which is not all that easy to find)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Support InsertInto Sorted ListingTable [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #7743:
URL: https://github.com/apache/arrow-datafusion/pull/7743#issuecomment-1751421496

   > I actually think the above is correct behavior. The table is not globally sorted, but rather each individual file is sorted. Each time you insert, at least one new file is inserted. In the above result we see two independently sorted chunks, which means each insert created one new sorted file.
   
   Yes, I think you are right. However, when I did an `EXPLAIN` plan I expect to see no Sorts (since each file is sorted, they can just be merged with `SortPreservingMerge`):
   
   ```
   ❯ explain select * from output order by time;
   +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
   +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | Sort: output.time ASC NULLS LAST                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
   |               |   TableScan: output projection=[time]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
   | physical_plan | SortPreservingMergeExec: [time@0 ASC NULLS LAST]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
   |               |   SortExec: expr=[time@0 ASC NULLS LAST]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
   |               |     ParquetExec: file_groups={16 groups: [[private/tmp/output/FtsEcvDwXi7JVaVq_6.parquet, private/tmp/output/FtsEcvDwXi7JVaVq_12.parquet], [private/tmp/output/1PHmXyyoDVGbi7oo_5.parquet, private/tmp/output/1PHmXyyoDVGbi7oo_12.parquet], [private/tmp/output/1PHmXyyoDVGbi7oo_4.parquet, private/tmp/output/1PHmXyyoDVGbi7oo_13.parquet], [private/tmp/output/FtsEcvDwXi7JVaVq_13.parquet, private/tmp/output/FtsEcvDwXi7JVaVq_7.parquet], [private/tmp/output/1PHmXyyoDVGbi7oo_11.parquet, private/tmp/output/1PHmXyyoDVGbi7oo_6.parquet], ...]}, projection=[time] |
   |               |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
   +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   2 rows in set. Query took 0.003 seconds.
   ```
   
   But now that I look at that plan, perhaps the issue is that there is more than one file in each group, so the sort order can't be maintained 🤔 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Support InsertInto Sorted ListingTable [arrow-datafusion]

Posted by "devinjdangelo (via GitHub)" <gi...@apache.org>.
devinjdangelo commented on code in PR #7743:
URL: https://github.com/apache/arrow-datafusion/pull/7743#discussion_r1349516448


##########
datafusion/sqllogictest/test_files/insert_to_external.slt:
##########
@@ -45,6 +45,35 @@ LOCATION '../../testing/data/csv/aggregate_test_100.csv'
 statement ok
 set datafusion.execution.target_partitions = 8;
 
+statement ok
+CREATE EXTERNAL TABLE
+ordered_insert_test(a bigint, b bigint)
+STORED AS csv
+LOCATION 'test_files/scratch/insert_to_external/insert_to_ordered/'
+WITH ORDER (a ASC, B DESC)
+OPTIONS(
+create_local_path 'true',
+insert_mode 'append_new_files',
+);
+
+query II
+INSERT INTO ordered_insert_test values (5, 1), (4, 2), (7,7), (7,8), (7,9), (7,10), (3, 3), (2, 4), (1, 5);
+----
+9

Review Comment:
   Just pushed an update with explain test.



##########
datafusion/physical-plan/src/insert.rs:
##########
@@ -73,6 +73,8 @@ pub struct FileSinkExec {
     sink_schema: SchemaRef,
     /// Schema describing the structure of the output data.
     count_schema: SchemaRef,
+    /// Optional required sort order for output data.
+    sort_order: Option<Vec<Option<Vec<PhysicalSortRequirement>>>>,

Review Comment:
   Just pushed an update with this change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Support InsertInto Sorted ListingTable [arrow-datafusion]

Posted by "devinjdangelo (via GitHub)" <gi...@apache.org>.
devinjdangelo commented on PR #7743:
URL: https://github.com/apache/arrow-datafusion/pull/7743#issuecomment-1751698060

   > But now that I look at that plan, perhaps the issue is that there is more than one file in each group, so the sort order can't be maintained 🤔
   
   Perhaps if each output file were larger we would avoid that issue. It would be nice if you could configure a desired file output size rather than relying on the plan partitioning... I opened #7767 for this.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org