You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/05/04 15:14:12 UTC

[arrow-datafusion] branch main updated: Supply consistent format output for FileScanConfig params (#6202)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 6be75ff2dc Supply consistent format output for FileScanConfig params (#6202)
6be75ff2dc is described below

commit 6be75ff2dcc47128b78a695477512ba86c46373f
Author: Tzu-Chiao Yeh <su...@gmail.com>
AuthorDate: Thu May 4 23:14:06 2023 +0800

    Supply consistent format output for FileScanConfig params (#6202)
    
    * Supply consistent format output for FileScanConfig params
    
    * Compact display output and optimize output ordering display
    
    * Appease clippy
    
    * update sqllogictest
    
    ---------
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
 datafusion-cli/src/object_storage.rs               |   3 +-
 datafusion/core/src/datasource/view.rs             |   3 +-
 .../combine_partial_final_agg.rs                   |   8 +-
 .../src/physical_optimizer/dist_enforcement.rs     | 108 +++++++--------
 .../core/src/physical_optimizer/repartition.rs     |  84 ++++++------
 .../src/physical_optimizer/sort_enforcement.rs     | 152 ++++++++++-----------
 .../core/src/physical_plan/file_format/avro.rs     |   7 +-
 .../core/src/physical_plan/file_format/csv.rs      |   7 +-
 .../core/src/physical_plan/file_format/json.rs     |   7 +-
 .../core/src/physical_plan/file_format/mod.rs      |  40 ++++++
 .../core/src/physical_plan/file_format/parquet.rs  |  30 +---
 .../src/physical_plan/joins/symmetric_hash_join.rs |   8 +-
 datafusion/core/src/physical_plan/mod.rs           |   2 +-
 datafusion/core/tests/sql/avro.rs                  |   2 +-
 datafusion/core/tests/sql/explain_analyze.rs       |   6 +-
 datafusion/core/tests/sql/json.rs                  |   2 +-
 .../tests/sqllogictests/test_files/explain.slt     |  36 ++++-
 .../core/tests/sqllogictests/test_files/order.slt  |   2 +-
 .../core/tests/sqllogictests/test_files/select.slt |   3 +-
 .../core/tests/sqllogictests/test_files/window.slt |  50 +++----
 docs/source/user-guide/sql/explain.md              |   4 +-
 21 files changed, 301 insertions(+), 263 deletions(-)

diff --git a/datafusion-cli/src/object_storage.rs b/datafusion-cli/src/object_storage.rs
index 1fd509775d..8d29f80a4b 100644
--- a/datafusion-cli/src/object_storage.rs
+++ b/datafusion-cli/src/object_storage.rs
@@ -112,7 +112,8 @@ fn get_bucket_name(url: &Url) -> Result<&str> {
 #[cfg(test)]
 mod tests {
     use datafusion::{
-        datasource::listing::ListingTableUrl, logical_expr::{LogicalPlan, DdlStatement},
+        datasource::listing::ListingTableUrl,
+        logical_expr::{DdlStatement, LogicalPlan},
         prelude::SessionContext,
     };
 
diff --git a/datafusion/core/src/datasource/view.rs b/datafusion/core/src/datasource/view.rs
index dea7ecadf7..391e4b93c4 100644
--- a/datafusion/core/src/datasource/view.rs
+++ b/datafusion/core/src/datasource/view.rs
@@ -473,7 +473,8 @@ mod tests {
         let formatted = arrow::util::pretty::pretty_format_batches(&plan)
             .unwrap()
             .to_string();
-        assert!(formatted.contains("ParquetExec: limit=Some(10)"));
+        assert!(formatted.contains("ParquetExec: "));
+        assert!(formatted.contains("projection=[bool_col, int_col], limit=10"));
         Ok(())
     }
 
diff --git a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
index 0674b6aa31..59e5fc95ed 100644
--- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
+++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
@@ -334,7 +334,7 @@ mod tests {
             "AggregateExec: mode=Final, gby=[], aggr=[COUNT(1)]",
             "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
             "AggregateExec: mode=Partial, gby=[], aggr=[COUNT(1)]",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
         ];
         assert_optimized!(expected, plan);
 
@@ -362,7 +362,7 @@ mod tests {
         let expected = &[
             "AggregateExec: mode=Final, gby=[], aggr=[COUNT(2)]",
             "AggregateExec: mode=Partial, gby=[], aggr=[COUNT(1)]",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
         ];
 
         assert_optimized!(expected, plan);
@@ -391,7 +391,7 @@ mod tests {
         // should combine the Partial/Final AggregateExecs to tne Single AggregateExec
         let expected = &[
             "AggregateExec: mode=Single, gby=[], aggr=[COUNT(1)]",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
         ];
 
         assert_optimized!(expected, plan);
@@ -425,7 +425,7 @@ mod tests {
         // should combine the Partial/Final AggregateExecs to tne Single AggregateExec
         let expected = &[
             "AggregateExec: mode=Single, gby=[c@2 as c], aggr=[Sum(b)]",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
         ];
 
         assert_optimized!(expected, plan);
diff --git a/datafusion/core/src/physical_optimizer/dist_enforcement.rs b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
index f74a7f2e93..e6fd15b7ce 100644
--- a/datafusion/core/src/physical_optimizer/dist_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
@@ -1273,12 +1273,12 @@ mod tests {
                             top_join_plan.as_str(),
                             join_plan.as_str(),
                             "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
-                            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+                            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
                             "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1",
                             "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
-                            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+                            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
                             "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
-                            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+                            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
                         ],
                         // Should include 4 RepartitionExecs
                         _ => vec![
@@ -1286,12 +1286,12 @@ mod tests {
                             "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=10",
                             join_plan.as_str(),
                             "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
-                            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+                            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
                             "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1",
                             "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
-                            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+                            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
                             "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
-                            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+                            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
                         ],
                     };
                     assert_optimized!(expected, top_join);
@@ -1329,12 +1329,12 @@ mod tests {
                                 top_join_plan.as_str(),
                                 join_plan.as_str(),
                                 "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
-                                "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+                                "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
                                 "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1",
                                 "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
-                                "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+                                "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
                                 "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
-                                "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+                                "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
                             ],
                         // Should include 4 RepartitionExecs
                         _ =>
@@ -1343,12 +1343,12 @@ mod tests {
                                 "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 6 }], 10), input_partitions=10",
                                 join_plan.as_str(),
                                 "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
-                                "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+                                "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
                                 "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1",
                                 "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
-                                "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+                                "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
                                 "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
-                                "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+                                "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
                             ],
                     };
                     assert_optimized!(expected, top_join);
@@ -1398,11 +1398,11 @@ mod tests {
             "ProjectionExec: expr=[a@0 as a1, a@0 as a2]",
             "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"a\", index: 0 }, Column { name: \"b\", index: 1 })]",
             "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
             "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 1 }], 10), input_partitions=1",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
             "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
         ];
         assert_optimized!(expected, top_join);
 
@@ -1420,11 +1420,11 @@ mod tests {
             "ProjectionExec: expr=[a@0 as a1, a@0 as a2]",
             "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"a\", index: 0 }, Column { name: \"b\", index: 1 })]",
             "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
             "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 1 }], 10), input_partitions=1",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
             "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
         ];
 
         assert_optimized!(expected, top_join);
@@ -1471,11 +1471,11 @@ mod tests {
             "ProjectionExec: expr=[c@2 as c1]",
             "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"a\", index: 0 }, Column { name: \"b\", index: 1 })]",
             "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
             "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 1 }], 10), input_partitions=1",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
             "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
         ];
 
         assert_optimized!(expected, top_join);
@@ -1508,11 +1508,11 @@ mod tests {
             "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
             "RepartitionExec: partitioning=Hash([Column { name: \"a1\", index: 0 }], 10), input_partitions=1",
             "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
             "AggregateExec: mode=FinalPartitioned, gby=[a2@0 as a2], aggr=[]",
             "RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 0 }], 10), input_partitions=1",
             "AggregateExec: mode=Partial, gby=[a@0 as a2], aggr=[]",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
         ];
         assert_optimized!(expected, join);
         Ok(())
@@ -1557,11 +1557,11 @@ mod tests {
             "AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, a1@1 as a1], aggr=[]",
             "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 0 }, Column { name: \"a1\", index: 1 }], 10), input_partitions=1",
             "AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
             "AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[]",
             "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 0 }, Column { name: \"a\", index: 1 }], 10), input_partitions=1",
             "AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[]",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
         ];
         assert_optimized!(expected, join);
         Ok(())
@@ -1663,16 +1663,16 @@ mod tests {
             "ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]",
             "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"b\", index: 1 }, Column { name: \"b1\", index: 1 }), (Column { name: \"c\", index: 2 }, Column { name: \"c1\", index: 2 }), (Column { name: \"a\", index: 0 }, Column { name: \"a1\", index: 0 })]",
             "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 1 }, Column { name: \"c\", index: 2 }, Column { name: \"a\", index: 0 }], 10), input_partitions=1",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
             "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }, Column { name: \"c1\", index: 2 }, Column { name: \"a1\", index: 0 }], 10), input_partitions=1",
             "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
             "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"b\", index: 1 }, Column { name: \"b1\", index: 1 }), (Column { name: \"c\", index: 2 }, Column { name: \"c1\", index: 2 }), (Column { name: \"a\", index: 0 }, Column { name: \"a1\", index: 0 })]",
             "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 1 }, Column { name: \"c\", index: 2 }, Column { name: \"a\", index: 0 }], 10), input_partitions=1",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
             "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }, Column { name: \"c1\", index: 2 }, Column { name: \"a1\", index: 0 }], 10), input_partitions=1",
             "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
         ];
         assert_optimized!(expected, filter_top_join);
         Ok(())
@@ -1785,16 +1785,16 @@ mod tests {
                 "ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]",
                 "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"a\", index: 0 }, Column { name: \"a1\", index: 0 }), (Column { name: \"b\", index: 1 }, Column { name: \"b1\", index: 1 }), (Column { name: \"c\", index: 2 }, Column { name: \"c1\", index: 2 })]",
                 "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }, Column { name: \"b\", index: 1 }, Column { name: \"c\", index: 2 }], 10), input_partitions=1",
-                "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+                "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
                 "RepartitionExec: partitioning=Hash([Column { name: \"a1\", index: 0 }, Column { name: \"b1\", index: 1 }, Column { name: \"c1\", index: 2 }], 10), input_partitions=1",
                 "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
-                "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+                "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
                 "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"c\", index: 2 }, Column { name: \"c1\", index: 2 }), (Column { name: \"b\", index: 1 }, Column { name: \"b1\", index: 1 }), (Column { name: \"a\", index: 0 }, Column { name: \"a1\", index: 0 })]",
                 "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }, Column { name: \"b\", index: 1 }, Column { name: \"a\", index: 0 }], 10), input_partitions=1",
-                "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+                "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
                 "RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 2 }, Column { name: \"b1\", index: 1 }, Column { name: \"a1\", index: 0 }], 10), input_partitions=1",
                 "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
-                "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+                "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
             ];
 
             assert_plan_txt!(expected, reordered);
@@ -1906,16 +1906,16 @@ mod tests {
                 "ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]",
                 "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"a\", index: 0 }, Column { name: \"a1\", index: 0 }), (Column { name: \"b\", index: 1 }, Column { name: \"b1\", index: 1 })]",
                 "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }, Column { name: \"b\", index: 1 }], 10), input_partitions=1",
-                "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+                "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
                 "RepartitionExec: partitioning=Hash([Column { name: \"a1\", index: 0 }, Column { name: \"b1\", index: 1 }], 10), input_partitions=1",
                 "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
-                "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+                "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
                 "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"c\", index: 2 }, Column { name: \"c1\", index: 2 }), (Column { name: \"b\", index: 1 }, Column { name: \"b1\", index: 1 }), (Column { name: \"a\", index: 0 }, Column { name: \"a1\", index: 0 })]",
                 "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }, Column { name: \"b\", index: 1 }, Column { name: \"a\", index: 0 }], 10), input_partitions=1",
-                "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+                "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
                 "RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 2 }, Column { name: \"b1\", index: 1 }, Column { name: \"a1\", index: 0 }], 10), input_partitions=1",
                 "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
-                "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+                "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
             ];
 
             assert_plan_txt!(expected, reordered);
@@ -1980,14 +1980,14 @@ mod tests {
                         join_plan.as_str(),
                         "SortExec: expr=[a@0 ASC]",
                         "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
-                        "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+                        "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
                         "SortExec: expr=[b1@1 ASC]",
                         "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1",
                         "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
-                        "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+                        "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
                         "SortExec: expr=[c@2 ASC]",
                         "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
-                        "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+                        "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
                     ],
                 // Should include 4 RepartitionExecs
                 _ => vec![
@@ -1997,14 +1997,14 @@ mod tests {
                         join_plan.as_str(),
                         "SortExec: expr=[a@0 ASC]",
                         "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
-                        "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+                        "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
                         "SortExec: expr=[b1@1 ASC]",
                         "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1",
                         "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
-                        "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+                        "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
                         "SortExec: expr=[c@2 ASC]",
                         "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
-                        "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+                        "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
                 ],
             };
             assert_optimized!(expected, top_join);
@@ -2033,14 +2033,14 @@ mod tests {
                             join_plan.as_str(),
                             "SortExec: expr=[a@0 ASC]",
                             "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
-                            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+                            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
                             "SortExec: expr=[b1@1 ASC]",
                             "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1",
                             "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
-                            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+                            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
                             "SortExec: expr=[c@2 ASC]",
                             "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
-                            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+                            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
                         ],
                         // Should include 4 RepartitionExecs and 4 SortExecs
                         _ => vec![
@@ -2050,14 +2050,14 @@ mod tests {
                             join_plan.as_str(),
                             "SortExec: expr=[a@0 ASC]",
                             "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
-                            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+                            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
                             "SortExec: expr=[b1@1 ASC]",
                             "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1",
                             "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
-                            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+                            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
                             "SortExec: expr=[c@2 ASC]",
                             "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
-                            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+                            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
                         ],
                     };
                     assert_optimized!(expected, top_join);
@@ -2124,13 +2124,13 @@ mod tests {
             "AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, a1@1 as a1], aggr=[]",
             "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 0 }, Column { name: \"a1\", index: 1 }], 10), input_partitions=1",
             "AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
             "SortExec: expr=[b2@1 ASC,a2@0 ASC]",
             "ProjectionExec: expr=[a@1 as a2, b@0 as b2]",
             "AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[]",
             "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 0 }, Column { name: \"a\", index: 1 }], 10), input_partitions=1",
             "AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[]",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
         ];
         assert_optimized!(expected, join);
         Ok(())
@@ -2159,7 +2159,7 @@ mod tests {
         let expected = &[
             "SortPreservingMergeExec: [a@0 ASC]",
             "CoalesceBatchesExec: target_batch_size=4096",
-            "ParquetExec: limit=None, partitions={2 groups: [[x], [y]]}, output_ordering=[a@0 ASC], projection=[a, b, c, d, e]",
+            "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]",
         ];
         assert_optimized!(expected, exec);
         Ok(())
@@ -2193,11 +2193,11 @@ mod tests {
             "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
             "RepartitionExec: partitioning=Hash([Column { name: \"a1\", index: 0 }], 10), input_partitions=1",
             "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
             "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
             "RepartitionExec: partitioning=Hash([Column { name: \"a1\", index: 0 }], 10), input_partitions=1",
             "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
         ];
         assert_optimized!(expected, plan);
         Ok(())
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs
index 1db61e379e..e7c72398fd 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -575,7 +575,7 @@ mod tests {
             "CoalescePartitionsExec",
             "AggregateExec: mode=Partial, gby=[], aggr=[]",
             "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
         ];
 
         assert_optimized!(expected, plan);
@@ -592,7 +592,7 @@ mod tests {
             "AggregateExec: mode=Partial, gby=[], aggr=[]",
             "FilterExec: c1@0",
             "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
         ];
 
         assert_optimized!(expected, plan);
@@ -610,7 +610,7 @@ mod tests {
             "FilterExec: c1@0",
             // nothing sorts the data, so the local limit doesn't require sorted data either
             "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
         ];
 
         assert_optimized!(expected, plan);
@@ -628,7 +628,7 @@ mod tests {
             "FilterExec: c1@0",
             // nothing sorts the data, so the local limit doesn't require sorted data either
             "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
         ];
 
         assert_optimized!(expected, plan);
@@ -644,7 +644,7 @@ mod tests {
             "LocalLimitExec: fetch=100",
             // data is sorted so can't repartition here
             "SortExec: expr=[c1@0 ASC]",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
         ];
 
         assert_optimized!(expected, plan);
@@ -662,7 +662,7 @@ mod tests {
             // data is sorted so can't repartition here even though
             // filter would benefit from parallelism, the answers might be wrong
             "SortExec: expr=[c1@0 ASC]",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
         ];
 
         assert_optimized!(expected, plan);
@@ -687,7 +687,7 @@ mod tests {
             "GlobalLimitExec: skip=0, fetch=100",
             "LocalLimitExec: fetch=100",
             // Expect no repartition to happen for local limit
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
         ];
 
         assert_optimized!(expected, plan);
@@ -714,7 +714,7 @@ mod tests {
             "GlobalLimitExec: skip=0, fetch=100",
             "LocalLimitExec: fetch=100",
             // Expect no repartition to happen for local limit
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
         ];
 
         assert_optimized!(expected, plan);
@@ -730,11 +730,11 @@ mod tests {
         let expected = &[
             "UnionExec",
             // Expect no repartition of ParquetExec
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
         ];
 
         assert_optimized!(expected, plan);
@@ -751,7 +751,7 @@ mod tests {
             "SortPreservingMergeExec: [c1@0 ASC]",
             "SortExec: expr=[c1@0 ASC]",
             "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
         ];
 
         assert_optimized!(expected, plan);
@@ -766,7 +766,7 @@ mod tests {
         // should not repartition / sort (as the data was already sorted)
         let expected = &[
             "SortPreservingMergeExec: [c1@0 ASC]",
-            "ParquetExec: limit=None, partitions={2 groups: [[x], [y]]}, output_ordering=[c1@0 ASC], projection=[c1]",
+            "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[c1], output_ordering=[c1@0 ASC]",
         ];
 
         assert_optimized!(expected, plan);
@@ -783,8 +783,8 @@ mod tests {
         let expected = &[
             "SortPreservingMergeExec: [c1@0 ASC]",
             "UnionExec",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]",
         ];
 
         assert_optimized!(expected, plan);
@@ -801,7 +801,7 @@ mod tests {
         // should not repartition as doing so destroys the necessary sort order
         let expected = &[
             "SortRequiredExec",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]",
         ];
 
         assert_optimized!(expected, plan);
@@ -830,11 +830,11 @@ mod tests {
             "UnionExec",
             // union input 1: no repartitioning
             "SortRequiredExec",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]",
             // union input 2: should repartition
             "FilterExec: c1@0",
             "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
         ];
 
         assert_optimized!(expected, plan);
@@ -852,7 +852,7 @@ mod tests {
             "SortExec: expr=[c1@0 ASC]",
             "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
             "ProjectionExec: expr=[c1@0 as c1]",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
         ];
 
         assert_optimized!(expected, plan);
@@ -869,7 +869,7 @@ mod tests {
         let expected = &[
             "SortPreservingMergeExec: [c1@0 ASC]",
             "ProjectionExec: expr=[c1@0 as c1]",
-            "ParquetExec: limit=None, partitions={2 groups: [[x], [y]]}, output_ordering=[c1@0 ASC], projection=[c1]",
+            "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[c1], output_ordering=[c1@0 ASC]",
         ];
 
         assert_optimized!(expected, plan);
@@ -884,7 +884,7 @@ mod tests {
         let expected = &[
             "SortExec: expr=[c1@0 ASC]",
             "ProjectionExec: expr=[c1@0 as c1]",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
         ];
 
         assert_optimized!(expected, plan);
@@ -902,7 +902,7 @@ mod tests {
             "SortExec: expr=[c1@0 ASC]",
             "FilterExec: c1@0",
             "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
         ];
 
         assert_optimized!(expected, plan);
@@ -924,7 +924,7 @@ mod tests {
             "FilterExec: c1@0",
             // repartition is lowest down
             "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
         ];
 
         assert_optimized!(expected, plan);
@@ -939,7 +939,7 @@ mod tests {
             "AggregateExec: mode=Final, gby=[], aggr=[]",
             "CoalescePartitionsExec",
             "AggregateExec: mode=Partial, gby=[], aggr=[]",
-            "ParquetExec: limit=None, partitions={2 groups: [[x:0..50], [x:50..100]]}, projection=[c1]",
+            "ParquetExec: file_groups={2 groups: [[x:0..50], [x:50..100]]}, projection=[c1]",
         ];
 
         assert_optimized!(expected, plan, 2, true, 10);
@@ -955,7 +955,7 @@ mod tests {
             "CoalescePartitionsExec",
             "AggregateExec: mode=Partial, gby=[], aggr=[]",
             // Plan already has two partitions
-            "ParquetExec: limit=None, partitions={2 groups: [[x], [y]]}, projection=[c1]",
+            "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[c1]",
         ];
 
         assert_optimized!(expected, plan, 2, true, 10);
@@ -971,7 +971,7 @@ mod tests {
             "CoalescePartitionsExec",
             "AggregateExec: mode=Partial, gby=[], aggr=[]",
             // Multiple source files splitted across partitions
-            "ParquetExec: limit=None, partitions={4 groups: [[x:0..75], [x:75..100, y:0..50], [y:50..125], [y:125..200]]}, projection=[c1]",
+            "ParquetExec: file_groups={4 groups: [[x:0..75], [x:75..100, y:0..50], [y:50..125], [y:125..200]]}, projection=[c1]",
         ];
 
         assert_optimized!(expected, plan, 4, true, 10);
@@ -988,7 +988,7 @@ mod tests {
             // data is sorted so can't repartition here
             "SortExec: expr=[c1@0 ASC]",
             // Doesn't parallelize for SortExec without preserve_partitioning
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
         ];
 
         assert_optimized!(expected, plan, 2, true, 10);
@@ -1007,7 +1007,7 @@ mod tests {
             // filter would benefit from parallelism, the answers might be wrong
             "SortExec: expr=[c1@0 ASC]",
             // SortExec doesn't benefit from input partitioning
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
         ];
 
         assert_optimized!(expected, plan, 2, true, 10);
@@ -1032,7 +1032,7 @@ mod tests {
             "GlobalLimitExec: skip=0, fetch=100",
             // Limit doesn't benefit from input partitionins - no parallelism
             "LocalLimitExec: fetch=100",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
         ];
 
         assert_optimized!(expected, plan, 2, true, 10);
@@ -1045,12 +1045,12 @@ mod tests {
 
         let expected = &[
             "UnionExec",
-            // Union doesn benefit from input partitioning - no parallelism
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+            // Union doesn't benefit from input partitioning - no parallelism
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
         ];
 
         assert_optimized!(expected, plan, 2, true, 10);
@@ -1064,7 +1064,7 @@ mod tests {
 
         // parallelization potentially could break sort order
         let expected = &[
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]",
         ];
 
         assert_optimized!(expected, plan, 2, true, 10);
@@ -1081,8 +1081,8 @@ mod tests {
         let expected = &[
             "SortPreservingMergeExec: [c1@0 ASC]",
             "UnionExec",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]",
         ];
 
         assert_optimized!(expected, plan, 2, true, 10);
@@ -1099,7 +1099,7 @@ mod tests {
         // no parallelization to preserve sort order
         let expected = &[
             "SortRequiredExec",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]",
         ];
 
         assert_optimized!(expected, plan, 2, true, 10);
@@ -1114,7 +1114,7 @@ mod tests {
         // data should not be repartitioned / resorted
         let expected = &[
             "ProjectionExec: expr=[c1@0 as c1]",
-            "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]",
         ];
 
         assert_optimized!(expected, plan, 2, true, 10);
diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index 1032a16c7f..713eaf288d 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -1748,11 +1748,11 @@ mod tests {
             "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
             "  RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
             "    UnionExec",
-            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+            "      ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
             "      GlobalLimitExec: skip=0, fetch=100",
             "        LocalLimitExec: fetch=100",
             "          SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
-            "            ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "            ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
         ];
 
         // We should keep the bottom `SortExec`.
@@ -1761,11 +1761,11 @@ mod tests {
             "  SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
             "    RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
             "      UnionExec",
-            "        ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+            "        ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
             "        GlobalLimitExec: skip=0, fetch=100",
             "          LocalLimitExec: fetch=100",
             "            SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
-            "              ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "              ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
         Ok(())
@@ -1838,9 +1838,9 @@ mod tests {
         let expected_input = vec![
             "SortPreservingMergeExec: [nullable_col@0 ASC]",
             "  UnionExec",
-            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+            "    ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
             "    SortExec: expr=[nullable_col@0 ASC]",
-            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "      ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
         ];
         // should not add a sort at the output of the union, input plan should not be changed
         let expected_optimized = expected_input.clone();
@@ -1869,9 +1869,9 @@ mod tests {
         let expected_input = vec![
             "SortPreservingMergeExec: [nullable_col@0 ASC]",
             "  UnionExec",
-            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC], projection=[nullable_col, non_nullable_col]",
+            "    ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC]",
             "    SortExec: expr=[nullable_col@0 ASC]",
-            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "      ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
         ];
         // should not add a sort at the output of the union, input plan should not be changed
         let expected_optimized = expected_input.clone();
@@ -1902,18 +1902,18 @@ mod tests {
         let expected_input = vec![
             "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
             "  UnionExec",
-            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+            "    ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
             "    SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
-            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "      ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
         ];
 
         let expected_optimized = vec![
             "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
             "  UnionExec",
             "    SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
-            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+            "      ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
             "    SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
-            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "      ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
         Ok(())
@@ -1945,20 +1945,20 @@ mod tests {
             "SortPreservingMergeExec: [nullable_col@0 ASC]",
             "  UnionExec",
             "    SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
-            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
-            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+            "      ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "    ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
             "    SortExec: expr=[nullable_col@0 ASC]",
-            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "      ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
         ];
         // should adjust sorting in the first input of the union such that it is not unnecessarily fine
         let expected_optimized = vec![
             "SortPreservingMergeExec: [nullable_col@0 ASC]",
             "  UnionExec",
             "    SortExec: expr=[nullable_col@0 ASC]",
-            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
-            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+            "      ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "    ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
             "    SortExec: expr=[nullable_col@0 ASC]",
-            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "      ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
         Ok(())
@@ -1990,20 +1990,20 @@ mod tests {
             "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
             "  UnionExec",
             "    SortExec: expr=[nullable_col@0 ASC]",
-            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
-            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+            "      ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "    ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
             "    SortExec: expr=[nullable_col@0 ASC]",
-            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "      ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
         ];
         let expected_optimized = vec![
             "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
             "  UnionExec",
             "    SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
-            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "      ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
             "    SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
-            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+            "      ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
             "    SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
-            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "      ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
         Ok(())
@@ -2043,17 +2043,17 @@ mod tests {
             "SortPreservingMergeExec: [nullable_col@0 ASC]",
             "  UnionExec",
             "    SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
-            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "      ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
             "    SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 DESC NULLS LAST]",
-            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "      ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
         ];
         let expected_optimized = vec![
             "SortPreservingMergeExec: [nullable_col@0 ASC]",
             "  UnionExec",
             "    SortExec: expr=[nullable_col@0 ASC]",
-            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "      ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
             "    SortExec: expr=[nullable_col@0 ASC]",
-            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "      ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
         Ok(())
@@ -2089,11 +2089,11 @@ mod tests {
             "SortPreservingMergeExec: [nullable_col@0 ASC]",
             "  UnionExec",
             "    SortExec: expr=[nullable_col@0 ASC]",
-            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
-            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+            "      ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "    ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
             "    SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
             "      RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
-            "        ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "        ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
         ];
         // Should adjust the requirement in the third input of the union so
         // that it is not unnecessarily fine.
@@ -2101,11 +2101,11 @@ mod tests {
             "SortPreservingMergeExec: [nullable_col@0 ASC]",
             "  UnionExec",
             "    SortExec: expr=[nullable_col@0 ASC]",
-            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
-            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+            "      ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "    ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
             "    SortExec: expr=[nullable_col@0 ASC]",
             "      RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
-            "        ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "        ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
         Ok(())
@@ -2132,18 +2132,18 @@ mod tests {
             "SortPreservingMergeExec: [nullable_col@0 ASC]",
             "  UnionExec",
             "    SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
-            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "      ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
             "    SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
-            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "      ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
         ];
         // Union preserves the inputs ordering and we should not change any of the SortExecs under UnionExec
         let expected_output = vec![
             "SortPreservingMergeExec: [nullable_col@0 ASC]",
             "  UnionExec",
             "    SortExec: expr=[nullable_col@0 ASC]",
-            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "      ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
             "    SortExec: expr=[nullable_col@0 ASC]",
-            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "      ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
         ];
         assert_optimized!(expected_input, expected_output, physical_plan);
         Ok(())
@@ -2186,16 +2186,16 @@ mod tests {
         let expected_input = vec![
             "UnionExec",
             "  SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
-            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "    ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
             "  SortExec: expr=[nullable_col@0 DESC NULLS LAST,non_nullable_col@1 DESC NULLS LAST]",
-            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "    ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
         ];
         // Since `UnionExec` doesn't preserve ordering in the plan above.
         // We shouldn't keep SortExecs in the plan.
         let expected_optimized = vec![
             "UnionExec",
-            "  ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
-            "  ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "  ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "  ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
         Ok(())
@@ -2237,16 +2237,16 @@ mod tests {
             "  SortPreservingMergeExec: [nullable_col@0 DESC NULLS LAST]",
             "    UnionExec",
             "      SortExec: expr=[nullable_col@0 DESC NULLS LAST]",
-            "        ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC], projection=[nullable_col, non_nullable_col]",
+            "        ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC]",
             "      SortExec: expr=[nullable_col@0 DESC NULLS LAST]",
-            "        ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+            "        ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
         ];
         let expected_optimized = vec![
             "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(NULL) }]",
             "  SortPreservingMergeExec: [nullable_col@0 ASC]",
             "    UnionExec",
-            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC], projection=[nullable_col, non_nullable_col]",
-            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+            "      ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC]",
+            "      ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
         Ok(())
@@ -2277,16 +2277,16 @@ mod tests {
             "  SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
             "    UnionExec",
             "      SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
-            "        ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+            "        ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
             "      SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
-            "        ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+            "        ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
         ];
         let expected_optimized = vec![
             "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }], mode=[Sorted]",
             "  SortPreservingMergeExec: [nullable_col@0 ASC]",
             "    UnionExec",
-            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
-            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+            "      ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
+            "      ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
         Ok(())
@@ -2327,21 +2327,21 @@ mod tests {
             "SortPreservingMergeExec: [nullable_col@0 ASC]",
             "  UnionExec",
             "    SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
-            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "      ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
             "    GlobalLimitExec: skip=0, fetch=100",
             "      LocalLimitExec: fetch=100",
             "        SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 DESC NULLS LAST]",
-            "          ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "          ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
         ];
         let expected_optimized = vec![
             "SortPreservingMergeExec: [nullable_col@0 ASC]",
             "  UnionExec",
             "    SortExec: expr=[nullable_col@0 ASC]",
-            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "      ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
             "    GlobalLimitExec: skip=0, fetch=100",
             "      LocalLimitExec: fetch=100",
             "        SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 DESC NULLS LAST]",
-            "          ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "          ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
         Ok(())
@@ -2385,8 +2385,8 @@ mod tests {
             let expected_input = vec![
                 "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
                 join_plan2.as_str(),
-                "    ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
-                "    ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]",
+                "    ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+                "    ParquetExec: file_groups={1 group: [[x]]}, projection=[col_a, col_b]",
             ];
             let expected_optimized = match join_type {
                 JoinType::Inner
@@ -2397,9 +2397,9 @@ mod tests {
                     vec![
                         join_plan.as_str(),
                         "  SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
-                        "    ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+                        "    ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
                         "  SortExec: expr=[col_a@0 ASC]",
-                        "    ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]",
+                        "    ParquetExec: file_groups={1 group: [[x]]}, projection=[col_a, col_b]",
                     ]
                 }
                 _ => {
@@ -2408,9 +2408,9 @@ mod tests {
                         "SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
                         join_plan2.as_str(),
                         "    SortExec: expr=[nullable_col@0 ASC]",
-                        "      ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+                        "      ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
                         "    SortExec: expr=[col_a@0 ASC]",
-                        "      ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]",
+                        "      ParquetExec: file_groups={1 group: [[x]]}, projection=[col_a, col_b]",
                     ]
                 }
             };
@@ -2462,8 +2462,8 @@ mod tests {
             let expected_input = vec![
                 spm_plan,
                 join_plan2.as_str(),
-                "    ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
-                "    ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]",
+                "    ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+                "    ParquetExec: file_groups={1 group: [[x]]}, projection=[col_a, col_b]",
             ];
             let expected_optimized = match join_type {
                 JoinType::Inner | JoinType::Right | JoinType::RightAnti => {
@@ -2471,9 +2471,9 @@ mod tests {
                     vec![
                         join_plan.as_str(),
                         "  SortExec: expr=[nullable_col@0 ASC]",
-                        "    ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+                        "    ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
                         "  SortExec: expr=[col_a@0 ASC,col_b@1 ASC]",
-                        "    ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]",
+                        "    ParquetExec: file_groups={1 group: [[x]]}, projection=[col_a, col_b]",
                     ]
                 }
                 _ => {
@@ -2482,9 +2482,9 @@ mod tests {
                         "SortExec: expr=[col_a@2 ASC,col_b@3 ASC]",
                         join_plan2.as_str(),
                         "    SortExec: expr=[nullable_col@0 ASC]",
-                        "      ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+                        "      ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
                         "    SortExec: expr=[col_a@0 ASC]",
-                        "      ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]",
+                        "      ParquetExec: file_groups={1 group: [[x]]}, projection=[col_a, col_b]",
                     ]
                 }
             };
@@ -2519,8 +2519,8 @@ mod tests {
         let expected_input = vec![
             "SortPreservingMergeExec: [col_b@3 ASC,col_a@2 ASC]",
             "  SortMergeJoin: join_type=Inner, on=[(Column { name: \"nullable_col\", index: 0 }, Column { name: \"col_a\", index: 0 })]",
-            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
-            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]",
+            "    ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "    ParquetExec: file_groups={1 group: [[x]]}, projection=[col_a, col_b]",
         ];
 
         // can not push down the sort requirements, need to add SortExec
@@ -2528,9 +2528,9 @@ mod tests {
             "SortExec: expr=[col_b@3 ASC,col_a@2 ASC]",
             "  SortMergeJoin: join_type=Inner, on=[(Column { name: \"nullable_col\", index: 0 }, Column { name: \"col_a\", index: 0 })]",
             "    SortExec: expr=[nullable_col@0 ASC]",
-            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "      ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
             "    SortExec: expr=[col_a@0 ASC]",
-            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]",
+            "      ParquetExec: file_groups={1 group: [[x]]}, projection=[col_a, col_b]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
 
@@ -2545,8 +2545,8 @@ mod tests {
         let expected_input = vec![
             "SortPreservingMergeExec: [nullable_col@0 ASC,col_b@3 ASC,col_a@2 ASC]",
             "  SortMergeJoin: join_type=Inner, on=[(Column { name: \"nullable_col\", index: 0 }, Column { name: \"col_a\", index: 0 })]",
-            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
-            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]",
+            "    ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "    ParquetExec: file_groups={1 group: [[x]]}, projection=[col_a, col_b]",
         ];
 
         // can not push down the sort requirements, need to add SortExec
@@ -2554,9 +2554,9 @@ mod tests {
             "SortExec: expr=[nullable_col@0 ASC,col_b@3 ASC,col_a@2 ASC]",
             "  SortMergeJoin: join_type=Inner, on=[(Column { name: \"nullable_col\", index: 0 }, Column { name: \"col_a\", index: 0 })]",
             "    SortExec: expr=[nullable_col@0 ASC]",
-            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "      ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
             "    SortExec: expr=[col_a@0 ASC]",
-            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]",
+            "      ParquetExec: file_groups={1 group: [[x]]}, projection=[col_a, col_b]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
 
@@ -2627,14 +2627,14 @@ mod tests {
             "  FilterExec: NOT non_nullable_col@1",
             "    CoalescePartitionsExec",
             "      RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
-            "        ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "        ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
         ];
         let expected_optimized = vec![
             "SortPreservingMergeExec: [nullable_col@0 ASC]",
             "  SortExec: expr=[nullable_col@0 ASC]",
             "    FilterExec: NOT non_nullable_col@1",
             "      RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
-            "        ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+            "        ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
         Ok(())
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs
index 87d3ca1ec7..16dc146750 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -136,12 +136,7 @@ impl ExecutionPlan for AvroExec {
     ) -> std::fmt::Result {
         match t {
             DisplayFormatType::Default => {
-                write!(
-                    f,
-                    "AvroExec: files={}, limit={:?}",
-                    super::FileGroupsDisplay(&self.base_config.file_groups),
-                    self.base_config.limit,
-                )
+                write!(f, "AvroExec: {}", self.base_config)
             }
         }
     }
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs
index 886c609e72..e2d2bb8ef7 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -172,11 +172,8 @@ impl ExecutionPlan for CsvExec {
             DisplayFormatType::Default => {
                 write!(
                     f,
-                    "CsvExec: files={}, has_header={}, limit={:?}, projection={}",
-                    super::FileGroupsDisplay(&self.base_config.file_groups),
-                    self.has_header,
-                    self.base_config.limit,
-                    super::ProjectSchemaDisplay(&self.projected_schema),
+                    "CsvExec: {}, has_header={}",
+                    self.base_config, self.has_header,
                 )
             }
         }
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs
index cef69883e9..d122fd78b4 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -146,12 +146,7 @@ impl ExecutionPlan for NdJsonExec {
     ) -> std::fmt::Result {
         match t {
             DisplayFormatType::Default => {
-                write!(
-                    f,
-                    "JsonExec: limit={:?}, files={}",
-                    self.base_config.limit,
-                    super::FileGroupsDisplay(&self.base_config.file_groups),
-                )
+                write!(f, "JsonExec: {}", self.base_config)
             }
         }
     }
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs
index 4f85f54113..6591d83465 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -234,6 +234,29 @@ impl FileScanConfig {
     }
 }
 
+impl Display for FileScanConfig {
+    fn fmt(&self, f: &mut Formatter) -> FmtResult {
+        let (schema, _, ordering) = self.project();
+
+        write!(f, "file_groups={}", FileGroupsDisplay(&self.file_groups))?;
+
+        if !schema.fields().is_empty() {
+            write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
+        }
+
+        if let Some(limit) = self.limit {
+            write!(f, ", limit={}", limit)?;
+        }
+
+        if let Some(orders) = ordering {
+            if !orders.is_empty() {
+                write!(f, ", output_ordering={}", OutputOrderingDisplay(&orders))?;
+            }
+        }
+        Ok(())
+    }
+}
+
 /// A wrapper to customize partitioned file display
 ///
 /// Prints in the format:
@@ -291,6 +314,23 @@ impl<'a> Display for ProjectSchemaDisplay<'a> {
     }
 }
 
+/// A wrapper to customize output ordering display.
+#[derive(Debug)]
+struct OutputOrderingDisplay<'a>(&'a [PhysicalSortExpr]);
+
+impl<'a> Display for OutputOrderingDisplay<'a> {
+    fn fmt(&self, f: &mut Formatter) -> FmtResult {
+        write!(f, "[")?;
+        for (i, e) in self.0.iter().enumerate() {
+            if i > 0 {
+                write!(f, ", ")?
+            }
+            write!(f, "{e}")?;
+        }
+        write!(f, "]")
+    }
+}
+
 /// A utility which can adapt file-level record batches to a table schema which may have a schema
 /// obtained from merging multiple file-level schemas.
 ///
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index c18b8fab74..0afb819d43 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -86,7 +86,7 @@ pub struct ParquetExec {
     /// Override for `Self::with_enable_page_index`. If None, uses
     /// values from base_config
     enable_page_index: Option<bool>,
-    /// Base configuraton for this scan
+    /// Base configuration for this scan
     base_config: FileScanConfig,
     projected_statistics: Statistics,
     projected_schema: SchemaRef,
@@ -419,20 +419,10 @@ impl ExecutionPlan for ParquetExec {
                     .map(|pre| format!(", pruning_predicate={}", pre.predicate_expr()))
                     .unwrap_or_default();
 
-                let output_ordering_string = self
-                    .output_ordering()
-                    .map(make_output_ordering_string)
-                    .unwrap_or_default();
-
                 write!(
                     f,
-                    "ParquetExec: limit={:?}, partitions={}{}{}{}, projection={}",
-                    self.base_config.limit,
-                    super::FileGroupsDisplay(&self.base_config.file_groups),
-                    predicate_string,
-                    pruning_predicate_string,
-                    output_ordering_string,
-                    super::ProjectSchemaDisplay(&self.projected_schema),
+                    "ParquetExec: {}{}{}",
+                    self.base_config, predicate_string, pruning_predicate_string,
                 )
             }
         }
@@ -447,20 +437,6 @@ impl ExecutionPlan for ParquetExec {
     }
 }
 
-fn make_output_ordering_string(ordering: &[PhysicalSortExpr]) -> String {
-    use std::fmt::Write;
-    let mut w: String = ", output_ordering=[".into();
-
-    for (i, e) in ordering.iter().enumerate() {
-        if i > 0 {
-            write!(&mut w, ", ").unwrap()
-        }
-        write!(&mut w, "{e}").unwrap()
-    }
-    write!(&mut w, "]").unwrap();
-    w
-}
-
 /// Implements [`FileOpener`] for a parquet file
 struct ParquetOpener {
     partition_index: usize,
diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
index 4356d4aa85..9013fb2692 100644
--- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
@@ -2454,10 +2454,10 @@ mod tests {
                 "SymmetricHashJoinExec: join_type=Full, on=[(Column { name: \"a2\", index: 1 }, Column { name: \"a2\", index: 1 })], filter=BinaryExpr { left: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 0 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Gt, right: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 1 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Plus, right: Literal { value: Int64(3) } } },  [...]
                 "  CoalesceBatchesExec: target_batch_size=8192",
                 "    RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 1 }], 8), input_partitions=1",
-                // "   CsvExec: files={1 group: [[tempdir/left.csv]]}, has_header=false, limit=None, projection=[a1, a2]",
+                // "   CsvExec: file_groups={1 group: [[tempdir/left.csv]]}, projection=[a1, a2], has_header=false",
                 "  CoalesceBatchesExec: target_batch_size=8192",
                 "    RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 1 }], 8), input_partitions=1",
-                // "   CsvExec: files={1 group: [[tempdir/right.csv]]}, has_header=false, limit=None, projection=[a1, a2]"
+                // "   CsvExec: file_groups={1 group: [[tempdir/right.csv]]}, projection=[a1, a2], has_header=false"
             ]
         };
         let mut actual: Vec<&str> = formatted.trim().lines().collect();
@@ -2507,10 +2507,10 @@ mod tests {
                 "SymmetricHashJoinExec: join_type=Full, on=[(Column { name: \"a2\", index: 1 }, Column { name: \"a2\", index: 1 })], filter=BinaryExpr { left: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 0 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Gt, right: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 1 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Plus, right: Literal { value: Int64(3) } } },  [...]
                 "  CoalesceBatchesExec: target_batch_size=8192",
                 "    RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 1 }], 8), input_partitions=1",
-                // "   CsvExec: files={1 group: [[tempdir/left.csv]]}, has_header=false, limit=None, projection=[a1, a2]",
+                // "   CsvExec: file_groups={1 group: [[tempdir/left.csv]]}, projection=[a1, a2], has_header=false",
                 "  CoalesceBatchesExec: target_batch_size=8192",
                 "    RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 1 }], 8), input_partitions=1",
-                // "   CsvExec: files={1 group: [[tempdir/right.csv]]}, has_header=false, limit=None, projection=[a1, a2]"
+                // "   CsvExec: file_groups={1 group: [[tempdir/right.csv]]}, projection=[a1, a2], has_header=false"
             ]
         };
         let mut actual: Vec<&str> = formatted.trim().lines().collect();
diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs
index 00850f3237..c8edf701cf 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -328,7 +328,7 @@ pub fn with_new_children_if_necessary(
 ///   assert_eq!("CoalesceBatchesExec: target_batch_size=8192\
 ///              \n  FilterExec: a@0 < 5\
 ///              \n    RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1\
-///              \n      CsvExec: files={1 group: [[WORKING_DIR/tests/data/example.csv]]}, has_header=true, limit=None, projection=[a]",
+///              \n      CsvExec: file_groups={1 group: [[WORKING_DIR/tests/data/example.csv]]}, projection=[a], has_header=true",
 ///               plan_string.trim());
 ///
 ///   let one_line = format!("{}", displayable_plan.one_line());
diff --git a/datafusion/core/tests/sql/avro.rs b/datafusion/core/tests/sql/avro.rs
index d933db067d..85ed30044c 100644
--- a/datafusion/core/tests/sql/avro.rs
+++ b/datafusion/core/tests/sql/avro.rs
@@ -149,7 +149,7 @@ async fn avro_explain() {
             \n  CoalescePartitionsExec\
             \n    AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]\
             \n      RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1\
-            \n        AvroExec: files={1 group: [[ARROW_TEST_DATA/avro/alltypes_plain.avro]]}, limit=None\
+            \n        AvroExec: file_groups={1 group: [[ARROW_TEST_DATA/avro/alltypes_plain.avro]]}, projection=[id]\
             \n",
         ],
     ];
diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs
index 6f7150d2a5..1ab933022d 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -609,7 +609,7 @@ async fn test_physical_plan_display_indent() {
         "                CoalesceBatchesExec: target_batch_size=4096",
         "                  FilterExec: c12@1 < 10",
         "                    RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1",
-        "                      CsvExec: files={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c12]",
+        "                      CsvExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1, c12], has_header=true",
     ];
 
     let normalizer = ExplainNormalizer::new();
@@ -650,12 +650,12 @@ async fn test_physical_plan_display_indent_multi_children() {
         "      CoalesceBatchesExec: target_batch_size=4096",
         "        RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 9000), input_partitions=9000",
         "          RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1",
-        "            CsvExec: files={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1]",
+        "            CsvExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1], has_header=true",
         "      CoalesceBatchesExec: target_batch_size=4096",
         "        RepartitionExec: partitioning=Hash([Column { name: \"c2\", index: 0 }], 9000), input_partitions=9000",
         "          RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1",
         "            ProjectionExec: expr=[c1@0 as c2]",
-        "              CsvExec: files={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1]",
+        "              CsvExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1], has_header=true",
     ];
 
     let normalizer = ExplainNormalizer::new();
diff --git a/datafusion/core/tests/sql/json.rs b/datafusion/core/tests/sql/json.rs
index 965a9c14fc..10fcdfda20 100644
--- a/datafusion/core/tests/sql/json.rs
+++ b/datafusion/core/tests/sql/json.rs
@@ -92,7 +92,7 @@ async fn json_explain() {
             \n  CoalescePartitionsExec\
             \n    AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]\
             \n      RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1\
-            \n        JsonExec: limit=None, files={1 group: [[WORKING_DIR/tests/jsons/2.json]]}\n",
+            \n        JsonExec: file_groups={1 group: [[WORKING_DIR/tests/jsons/2.json]]}, projection=[a]\n",
         ],
     ];
     assert_eq!(expected, actual);
diff --git a/datafusion/core/tests/sqllogictests/test_files/explain.slt b/datafusion/core/tests/sqllogictests/test_files/explain.slt
index fe1d3ac2e4..0fdc9bf61a 100644
--- a/datafusion/core/tests/sqllogictests/test_files/explain.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/explain.slt
@@ -47,7 +47,41 @@ ProjectionExec: expr=[c1@0 as c1]
   CoalesceBatchesExec: target_batch_size=8192
     FilterExec: c2@1 > 10
       RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-        CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2]
+        CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2], has_header=true
+
+# explain_csv_exec_scan_config
+
+statement ok
+CREATE EXTERNAL TABLE aggregate_test_100_with_order (
+        c1  VARCHAR NOT NULL,
+        c2  TINYINT NOT NULL,
+        c3  SMALLINT NOT NULL,
+        c4  SMALLINT NOT NULL,
+        c5  INTEGER NOT NULL,
+        c6  BIGINT NOT NULL,
+        c7  SMALLINT NOT NULL,
+        c8  INT NOT NULL,
+        c9  INT UNSIGNED NOT NULL,
+        c10 BIGINT UNSIGNED NOT NULL,
+        c11 FLOAT NOT NULL,
+        c12 DOUBLE NOT NULL,
+        c13 VARCHAR NOT NULL
+    )
+STORED AS CSV
+WITH HEADER ROW
+WITH ORDER (c1 ASC)
+LOCATION '../../testing/data/csv/aggregate_test_100.csv';
+
+query TT
+explain SELECT c1 FROM aggregate_test_100_with_order order by c1 ASC limit 10
+----
+logical_plan
+Limit: skip=0, fetch=10
+  Sort: aggregate_test_100_with_order.c1 ASC NULLS LAST, fetch=10
+    TableScan: aggregate_test_100_with_order projection=[c1]
+physical_plan
+GlobalLimitExec: skip=0, fetch=10
+  CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], output_ordering=[c1@0 ASC NULLS LAST], has_header=true
 
 
 ## explain_physical_plan_only
diff --git a/datafusion/core/tests/sqllogictests/test_files/order.slt b/datafusion/core/tests/sqllogictests/test_files/order.slt
index d42d2cf62f..3b77319c0c 100644
--- a/datafusion/core/tests/sqllogictests/test_files/order.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/order.slt
@@ -165,7 +165,7 @@ Projection: aggregate_test_100.c1, aggregate_test_100.c2
 physical_plan
 ProjectionExec: expr=[c1@0 as c1, c2@1 as c2]
   SortExec: expr=[c2@1 ASC NULLS LAST,c3@2 ASC NULLS LAST]
-    CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c3]
+    CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3], has_header=true
 
 query II
 SELECT c2, c3 FROM aggregate_test_100 ORDER BY c2, c3, c2
diff --git a/datafusion/core/tests/sqllogictests/test_files/select.slt b/datafusion/core/tests/sqllogictests/test_files/select.slt
index ff523d713d..26db6cc059 100644
--- a/datafusion/core/tests/sqllogictests/test_files/select.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/select.slt
@@ -661,9 +661,8 @@ select * from largeutf8_data where large_str != '1'
 statement ok
 CREATE TABLE empty_table;
 
-query
+statement ok
 SELECT * FROM empty_table
-----
 
 
 # TODO: boolean_literal
diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt b/datafusion/core/tests/sqllogictests/test_files/window.slt
index 5ba3644923..66b011eeb9 100644
--- a/datafusion/core/tests/sqllogictests/test_files/window.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/window.slt
@@ -1217,7 +1217,7 @@ ProjectionExec: expr=[c9@0 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate
     ProjectionExec: expr=[c9@1 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as SUM(aggregate_test_100.c9)]
       BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
         SortExec: expr=[c9@1 ASC NULLS LAST,c8@0 ASC NULLS LAST]
-          CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c8, c9]
+          CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c8, c9], has_header=true
 
 
 # over_order_by_sort_keys_sorting_prefix_compacting
@@ -1238,7 +1238,7 @@ ProjectionExec: expr=[c2@0 as c2, MAX(aggregate_test_100.c9) ORDER BY [aggregate
     BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c9): Ok(Field { name: "MAX(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }], mode=[Sorted]
       BoundedWindowAggExec: wdw=[MIN(aggregate_test_100.c9): Ok(Field { name: "MIN(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }], mode=[Sorted]
         SortExec: expr=[c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]
-          CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c2, c9]
+          CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c9], has_header=true
 
 
 # FIXME: for now we are not detecting prefix of sorting keys in order to re-arrange with global and save one SortExec
@@ -1263,7 +1263,7 @@ SortExec: expr=[c2@0 ASC NULLS LAST]
         SortExec: expr=[c9@1 ASC NULLS LAST,c2@0 ASC NULLS LAST]
           BoundedWindowAggExec: wdw=[MIN(aggregate_test_100.c9): Ok(Field { name: "MIN(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }], mode=[Sorted]
             SortExec: expr=[c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]
-              CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c2, c9]
+              CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c9], has_header=true
 
 # test_window_partition_by_order_by
 statement ok
@@ -1293,7 +1293,7 @@ ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_10
                 CoalesceBatchesExec: target_batch_size=4096
                   RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }, Column { name: "c2", index: 1 }], 2), input_partitions=2
                     RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
-                      CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c4]
+                      CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c4], has_header=true
 
 
 # test_window_agg_sort_reversed_plan
@@ -1318,7 +1318,7 @@ ProjectionExec: expr=[c9@0 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate
     BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }], mode=[Sorted]
       BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
         SortExec: expr=[c9@0 DESC]
-          CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c9]
+          CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true
 
 query III
 SELECT
@@ -1359,7 +1359,7 @@ ProjectionExec: expr=[c9@0 as c9, FIRST_VALUE(aggregate_test_100.c9) ORDER BY [a
     BoundedWindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: "FIRST_VALUE(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: "LAG(aggregate_test_100.c9,Int64(2),Int64(10101))", data_type: UInt64, nullable: true, dict_id: 0, dict_is_orde [...]
       BoundedWindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: "FIRST_VALUE(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: "LAG(aggregate_test_100.c9,Int64(2),Int64(10101))", data_type: UInt64, nullable: true, dict_id: 0, dict_is_or [...]
         SortExec: expr=[c9@0 DESC]
-          CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c9]
+          CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true
 
 query IIIIIII
 SELECT
@@ -1403,7 +1403,7 @@ ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 A
       SortExec: expr=[c9@0 ASC NULLS LAST]
         BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
           SortExec: expr=[c9@0 DESC]
-            CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c9]
+            CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true
 
 
 query III
@@ -1446,7 +1446,7 @@ ProjectionExec: expr=[c9@2 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate
         BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
           BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
             SortExec: expr=[c9@2 DESC,c1@0 DESC]
-              CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c9]
+              CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c9], has_header=true
 
 query IIII
 SELECT
@@ -1537,7 +1537,7 @@ ProjectionExec: expr=[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST]
                           WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: P [...]
                             BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
                               SortExec: expr=[c3@2 DESC,c1@0 ASC NULLS LAST]
-                                CsvExec: files={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/null_cases.csv]]}, has_header=true, limit=None, projection=[c1, c2, c3]
+                                CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/null_cases.csv]]}, projection=[c1, c2, c3], has_header=true
 
 query IIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIII
 SELECT
@@ -1612,7 +1612,7 @@ ProjectionExec: expr=[c9@1 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate
     BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
       BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
         SortExec: expr=[c1@0 ASC NULLS LAST,c9@1 DESC]
-          CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c9]
+          CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c9], has_header=true
 
 
 query III
@@ -1656,7 +1656,7 @@ ProjectionExec: expr=[c9@1 as c9, SUM(aggregate_test_100.c9) PARTITION BY [aggre
     BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }], mode=[Sorted]
       BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
         SortExec: expr=[c1@0 ASC NULLS LAST,c9@1 DESC]
-          CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c9]
+          CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c9], has_header=true
 
 query III
 SELECT
@@ -1702,7 +1702,7 @@ ProjectionExec: expr=[c3@0 as c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate
       ProjectionExec: expr=[c3@1 as c3, c4@2 as c4, c9@3 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as SUM(aggregate_test_100.c9)]
         BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int16(NULL)), end_bound: CurrentRow }], mode=[Sorted]
           SortExec: expr=[c3@1 + c4@2 DESC,c9@3 DESC,c2@0 ASC NULLS LAST]
-            CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c2, c3, c4, c9]
+            CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3, c4, c9], has_header=true
 
 
 query III
@@ -1756,7 +1756,7 @@ ProjectionExec: expr=[COUNT(UInt8(1))@0 as global_count]
                       CoalesceBatchesExec: target_batch_size=4096
                         FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434
                           RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
-                            CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c13]
+                            CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c13], has_header=true
 
 query I
 SELECT count(*) as global_count FROM
@@ -1802,7 +1802,7 @@ GlobalLimitExec: skip=0, fetch=5
                 ProjectionExec: expr=[c3@1 as c3, c9@2 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as SUM(aggregate_test_100.c9)]
                   BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int16(NULL)), end_bound: CurrentRow }], mode=[Sorted]
                     SortExec: expr=[c3@1 DESC,c9@2 DESC,c2@0 ASC NULLS LAST]
-                      CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c2, c3, c9]
+                      CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3, c9], has_header=true
 
 
 
@@ -1841,7 +1841,7 @@ SortPreservingMergeExec: [c1@0 ASC NULLS LAST]
         CoalesceBatchesExec: target_batch_size=4096
           RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }], 2), input_partitions=2
             RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
-              CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1]
+              CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], has_header=true
 
 query TI
 SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM aggregate_test_100 ORDER BY c1 ASC
@@ -1968,7 +1968,7 @@ SortExec: expr=[c1@0 ASC NULLS LAST]
           CoalesceBatchesExec: target_batch_size=4096
             RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }], 2), input_partitions=2
               RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
-                CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1]
+                CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], has_header=true
 
 statement ok
 set datafusion.optimizer.repartition_sorts = true;
@@ -1997,7 +1997,7 @@ SortExec: expr=[c1@0 ASC NULLS LAST]
               CoalesceBatchesExec: target_batch_size=4096
                 RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }], 2), input_partitions=2
                   RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
-                    CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c9]
+                    CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c9], has_header=true
 
 # test_window_agg_with_global_limit
 statement ok
@@ -2020,7 +2020,7 @@ ProjectionExec: expr=[ARRAYAGG(aggregate_test_100.c13)@0 as array_agg1]
         RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
           GlobalLimitExec: skip=0, fetch=1
             SortExec: fetch=1, expr=[c13@0 ASC NULLS LAST]
-              CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c13]
+              CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c13], has_header=true
 
 
 query ?
@@ -2087,7 +2087,7 @@ GlobalLimitExec: skip=0, fetch=5
             BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
               WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
                 SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST,c9@3 ASC NULLS LAST,c8@2 ASC NULLS LAST]
-                  CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c8, c9]
+                  CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c8, c9], has_header=true
 
 
 
@@ -2146,7 +2146,7 @@ ProjectionExec: expr=[c9@1 as c9, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER B
                 WindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: "SUM(t1.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
                   SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST,c9@3 ASC NULLS LAST,c8@2 ASC NULLS LAST]
                     ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c8@2 as c8, c9@3 as c9, c1@0 as c1_alias]
-                      CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c8, c9]
+                      CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c8, c9], has_header=true
 
 
 
@@ -2192,7 +2192,7 @@ ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2]
           ProjectionExec: expr=[c1@0 as c1, c9@2 as c9, c12@3 as c12, SUM(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as SUM(aggregate_test_100.c12)]
             BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c12): Ok(Field { name: "SUM(aggregate_test_100.c12)", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Groups, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }], mode=[Sorted]
               SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]
-                CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c9, c12]
+                CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c9, c12], has_header=true
 
 query RR
 SELECT SUM(c12) OVER(ORDER BY c1, c2 GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as sum1,
@@ -2228,7 +2228,7 @@ GlobalLimitExec: skip=0, fetch=5
   ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1]
     BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
       SortExec: expr=[c9@0 ASC NULLS LAST]
-        CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c9]
+        CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true
 
 query II
 SELECT c9, rn1 FROM (SELECT c9,
@@ -2267,7 +2267,7 @@ GlobalLimitExec: skip=0, fetch=5
   ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1]
     BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
       SortExec: expr=[c9@0 DESC]
-        CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c9]
+        CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true
 
 query II
 SELECT c9, rn1 FROM (SELECT c9,
@@ -2307,7 +2307,7 @@ GlobalLimitExec: skip=0, fetch=5
     ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1]
       BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
         SortExec: expr=[c9@0 DESC]
-          CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c9]
+          CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true
 
 query II
 SELECT c9, rn1 FROM (SELECT c9,
@@ -2350,7 +2350,7 @@ GlobalLimitExec: skip=0, fetch=5
     ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1]
       BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
         SortExec: expr=[c9@0 DESC]
-          CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c9]
+          CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true
 
 query II
 SELECT c9, rn1 FROM (SELECT c9,
diff --git a/docs/source/user-guide/sql/explain.md b/docs/source/user-guide/sql/explain.md
index ca4169d577..b240b14eb5 100644
--- a/docs/source/user-guide/sql/explain.md
+++ b/docs/source/user-guide/sql/explain.md
@@ -44,7 +44,7 @@ EXPLAIN SELECT SUM(x) FROM table GROUP BY b;
 |               |       RepartitionExec: partitioning=Hash([Column { name: "b", index: 0 }], 16)                                                                                 |
 |               |         AggregateExec: mode=Partial, gby=[b@1 as b], aggr=[SUM(table.x)]                                                                                         |
 |               |           RepartitionExec: partitioning=RoundRobinBatch(16)                                                                                                    |
-|               |             CsvExec: source=Path(/tmp/table.csv: [/tmp/table.csv]), has_header=false, limit=None, projection=[x, b]                                            |
+|               |             CsvExec: file_groups={1 group: [[/tmp/table.csv]]}, projection=[x, b], has_header=false                                            |
 |               |                                                                                                                                                                |
 +---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
 ```
@@ -66,6 +66,6 @@ EXPLAIN ANALYZE SELECT SUM(x) FROM table GROUP BY b;
 |                   |         RepartitionExec: partitioning=Hash([Column { name: "b", index: 0 }], 16), metrics=[sendTime=839560, fetchTime=122528525, repartitionTime=5327877] |
 |                   |           HashAggregateExec: mode=Partial, gby=[b@1 as b], aggr=[SUM(x)], metrics=[outputRows=2]                                                          |
 |                   |             RepartitionExec: partitioning=RoundRobinBatch(16), metrics=[fetchTime=5660489, repartitionTime=0, sendTime=8012]                              |
-|                   |               CsvExec: source=Path(/tmp/table.csv: [/tmp/table.csv]), has_header=false, metrics=[]                                                        |
+|                   |               CsvExec: file_groups={1 group: [[/tmp/table.csv]]}, has_header=false, metrics=[]                                                        |
 +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
 ```