You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/01/30 19:18:32 UTC

[arrow-datafusion] branch master updated: Parquet parallel scan (#5057)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 67b1da8a7 Parquet parallel scan (#5057)
67b1da8a7 is described below

commit 67b1da8a71397185a49a6418af3e1cfe9329f6c5
Author: Eduard Karacharov <13...@users.noreply.github.com>
AuthorDate: Mon Jan 30 22:18:25 2023 +0300

    Parquet parallel scan (#5057)
    
    * parallel parquet scanning
    
    * repartitioning ParquetExec
    
    * minor changes & review comments
    
    * settings reorganized
    
    * Apply suggestions from code review
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
    
    * additional test case & updated docs
    
    ---------
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
 datafusion/common/src/config.rs                    |   9 +
 datafusion/core/src/execution/context.rs           |  12 +
 .../core/src/physical_optimizer/repartition.rs     | 261 ++++++++++++++++-
 .../core/src/physical_plan/file_format/mod.rs      |   4 +
 .../core/src/physical_plan/file_format/parquet.rs  | 317 +++++++++++++++++++++
 .../test_files/information_schema.slt              |   2 +
 docs/source/user-guide/configs.md                  |  66 ++---
 7 files changed, 630 insertions(+), 41 deletions(-)

diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index 3d67ddb7e..86b78aeac 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -261,10 +261,19 @@ config_namespace! {
         /// in parallel using the provided `target_partitions` level"
         pub repartition_aggregations: bool, default = true
 
+        /// Minimum total files size in bytes to perform file scan repartitioning.
+        pub repartition_file_min_size: usize, default = 10 * 1024 * 1024
+
         /// Should DataFusion repartition data using the join keys to execute joins in parallel
         /// using the provided `target_partitions` level"
         pub repartition_joins: bool, default = true
 
+        /// When set to true, file groups will be repartitioned to achieve maximum parallelism.
+        /// Currently supported only for Parquet format in which case
+        /// multiple row groups from the same file may be read concurrently. If false then each
+        /// row group is read serially, though different files may be read in parallel.
+        pub repartition_file_scans: bool, default = false
+
         /// Should DataFusion repartition data using the partitions keys to execute window
         /// functions in parallel using the provided `target_partitions` level"
         pub repartition_windows: bool, default = true
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 99a49d04d..9d4d8367a 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -1251,6 +1251,18 @@ impl SessionConfig {
         self
     }
 
+    /// Sets minimum file range size for repartitioning scans
+    pub fn with_repartition_file_min_size(mut self, size: usize) -> Self {
+        self.options.optimizer.repartition_file_min_size = size;
+        self
+    }
+
+    /// Enables or disables the use of repartitioning for file scans
+    pub fn with_repartition_file_scans(mut self, enabled: bool) -> Self {
+        self.options.optimizer.repartition_file_scans = enabled;
+        self
+    }
+
     /// Enables or disables the use of repartitioning for window functions to improve parallelism
     pub fn with_repartition_windows(mut self, enabled: bool) -> Self {
         self.options.optimizer.repartition_windows = enabled;
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs
index 1285b9089..5a3af7bbc 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -23,7 +23,8 @@ use crate::config::ConfigOptions;
 use crate::error::Result;
 use crate::physical_plan::Partitioning::*;
 use crate::physical_plan::{
-    repartition::RepartitionExec, with_new_children_if_necessary, ExecutionPlan,
+    file_format::ParquetExec, repartition::RepartitionExec,
+    with_new_children_if_necessary, ExecutionPlan,
 };
 
 /// Optimizer that introduces repartition to introduce more
@@ -167,6 +168,8 @@ fn optimize_partitions(
     is_root: bool,
     can_reorder: bool,
     would_benefit: bool,
+    repartition_file_scans: bool,
+    repartition_file_min_size: usize,
 ) -> Result<Arc<dyn ExecutionPlan>> {
     // Recurse into children bottom-up (attempt to repartition as
     // early as possible)
@@ -199,6 +202,8 @@ fn optimize_partitions(
                     false, // child is not root
                     can_reorder_child,
                     plan.benefits_from_input_partitioning(),
+                    repartition_file_scans,
+                    repartition_file_min_size,
                 )
             })
             .collect::<Result<_>>()?;
@@ -227,14 +232,28 @@ fn optimize_partitions(
         could_repartition = false;
     }
 
-    if would_benefit && could_repartition && can_reorder {
-        Ok(Arc::new(RepartitionExec::try_new(
-            new_plan,
-            RoundRobinBatch(target_partitions),
-        )?))
-    } else {
-        Ok(new_plan)
+    let repartition_allowed = would_benefit && could_repartition && can_reorder;
+
+    // If repartition is not allowed - return plan as it is
+    if !repartition_allowed {
+        return Ok(new_plan);
+    }
+
+    // For ParquetExec return internally repartitioned version of the plan in case `repartition_file_scans` is set
+    if let Some(parquet_exec) = new_plan.as_any().downcast_ref::<ParquetExec>() {
+        if repartition_file_scans {
+            return Ok(Arc::new(
+                parquet_exec
+                    .get_repartitioned(target_partitions, repartition_file_min_size),
+            ));
+        }
     }
+
+    // Otherwise - return plan wrapped up in RepartitionExec
+    Ok(Arc::new(RepartitionExec::try_new(
+        new_plan,
+        RoundRobinBatch(target_partitions),
+    )?))
 }
 
 /// Returns true if `plan` requires any of inputs to be sorted in some
@@ -253,6 +272,8 @@ impl PhysicalOptimizerRule for Repartition {
     ) -> Result<Arc<dyn ExecutionPlan>> {
         let target_partitions = config.execution.target_partitions;
         let enabled = config.optimizer.enable_round_robin_repartition;
+        let repartition_file_scans = config.optimizer.repartition_file_scans;
+        let repartition_file_min_size = config.optimizer.repartition_file_min_size;
         // Don't run optimizer if target_partitions == 1
         if !enabled || target_partitions == 1 {
             Ok(plan)
@@ -266,6 +287,8 @@ impl PhysicalOptimizerRule for Repartition {
                 is_root,
                 can_reorder,
                 would_benefit,
+                repartition_file_scans,
+                repartition_file_min_size,
             )
         }
     }
@@ -331,6 +354,28 @@ mod tests {
         ))
     }
 
+    /// Create a non sorted parquet exec over two files / partitions
+    fn parquet_exec_two_partitions() -> Arc<ParquetExec> {
+        Arc::new(ParquetExec::new(
+            FileScanConfig {
+                object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
+                file_schema: schema(),
+                file_groups: vec![
+                    vec![PartitionedFile::new("x".to_string(), 100)],
+                    vec![PartitionedFile::new("y".to_string(), 200)],
+                ],
+                statistics: Statistics::default(),
+                projection: None,
+                limit: None,
+                table_partition_cols: vec![],
+                output_ordering: None,
+                infinite_source: false,
+            },
+            None,
+            None,
+        ))
+    }
+
     // Created a sorted parquet exec
     fn parquet_exec_sorted() -> Arc<ParquetExec> {
         let sort_exprs = vec![PhysicalSortExpr {
@@ -448,10 +493,16 @@ mod tests {
     /// Runs the repartition optimizer and asserts the plan against the expected
     macro_rules! assert_optimized {
         ($EXPECTED_LINES: expr, $PLAN: expr) => {
+            assert_optimized!($EXPECTED_LINES, $PLAN, 10, false, 1024);
+        };
+
+        ($EXPECTED_LINES: expr, $PLAN: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr) => {
             let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect();
 
             let mut config = ConfigOptions::new();
-            config.execution.target_partitions = 10;
+            config.execution.target_partitions = $TARGET_PARTITIONS;
+            config.optimizer.repartition_file_scans = $REPARTITION_FILE_SCANS;
+            config.optimizer.repartition_file_min_size = $REPARTITION_FILE_MIN_SIZE;
 
             // run optimizer
             let optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
@@ -846,6 +897,198 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn parallelization_single_partition() -> Result<()> {
+        let plan = aggregate(parquet_exec());
+
+        let expected = [
+            "AggregateExec: mode=Final, gby=[], aggr=[]",
+            "CoalescePartitionsExec",
+            "AggregateExec: mode=Partial, gby=[], aggr=[]",
+            "ParquetExec: limit=None, partitions={2 groups: [[x:0..50], [x:50..100]]}, projection=[c1]",
+        ];
+
+        assert_optimized!(expected, plan, 2, true, 10);
+        Ok(())
+    }
+
+    #[test]
+    fn parallelization_two_partitions() -> Result<()> {
+        let plan = aggregate(parquet_exec_two_partitions());
+
+        let expected = [
+            "AggregateExec: mode=Final, gby=[], aggr=[]",
+            "CoalescePartitionsExec",
+            "AggregateExec: mode=Partial, gby=[], aggr=[]",
+            // Plan already has two partitions
+            "ParquetExec: limit=None, partitions={2 groups: [[x], [y]]}, projection=[c1]",
+        ];
+
+        assert_optimized!(expected, plan, 2, true, 10);
+        Ok(())
+    }
+
+    #[test]
+    fn parallelization_two_partitions_into_four() -> Result<()> {
+        let plan = aggregate(parquet_exec_two_partitions());
+
+        let expected = [
+            "AggregateExec: mode=Final, gby=[], aggr=[]",
+            "CoalescePartitionsExec",
+            "AggregateExec: mode=Partial, gby=[], aggr=[]",
+            // Multiple source files splitted across partitions
+            "ParquetExec: limit=None, partitions={4 groups: [[x:0..75], [x:75..100, y:0..50], [y:50..125], [y:125..200]]}, projection=[c1]",
+        ];
+
+        assert_optimized!(expected, plan, 4, true, 10);
+        Ok(())
+    }
+
+    #[test]
+    fn parallelization_sorted_limit() -> Result<()> {
+        let plan = limit_exec(sort_exec(parquet_exec(), false));
+
+        let expected = &[
+            "GlobalLimitExec: skip=0, fetch=100",
+            "LocalLimitExec: fetch=100",
+            // data is sorted so can't repartition here
+            "SortExec: [c1@0 ASC]",
+            // Doesn't parallelize for SortExec without preserve_partitioning
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+        ];
+
+        assert_optimized!(expected, plan, 2, true, 10);
+        Ok(())
+    }
+
+    #[test]
+    fn parallelization_limit_with_filter() -> Result<()> {
+        let plan = limit_exec(filter_exec(sort_exec(parquet_exec(), false)));
+
+        let expected = &[
+            "GlobalLimitExec: skip=0, fetch=100",
+            "LocalLimitExec: fetch=100",
+            "FilterExec: c1@0",
+            // data is sorted so can't repartition here even though
+            // filter would benefit from parallelism, the answers might be wrong
+            "SortExec: [c1@0 ASC]",
+            // SortExec doesn't benefit from input partitioning
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+        ];
+
+        assert_optimized!(expected, plan, 2, true, 10);
+        Ok(())
+    }
+
+    #[test]
+    fn parallelization_ignores_limit() -> Result<()> {
+        let plan = aggregate(limit_exec(filter_exec(limit_exec(parquet_exec()))));
+
+        let expected = &[
+            "AggregateExec: mode=Final, gby=[], aggr=[]",
+            "CoalescePartitionsExec",
+            "AggregateExec: mode=Partial, gby=[], aggr=[]",
+            "RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
+            "GlobalLimitExec: skip=0, fetch=100",
+            "CoalescePartitionsExec",
+            "LocalLimitExec: fetch=100",
+            "FilterExec: c1@0",
+            // repartition should happen prior to the filter to maximize parallelism
+            "RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
+            "GlobalLimitExec: skip=0, fetch=100",
+            // Limit doesn't benefit from input partitionins - no parallelism
+            "LocalLimitExec: fetch=100",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+        ];
+
+        assert_optimized!(expected, plan, 2, true, 10);
+        Ok(())
+    }
+
+    #[test]
+    fn parallelization_union_inputs() -> Result<()> {
+        let plan = union_exec(vec![parquet_exec(); 5]);
+
+        let expected = &[
+            "UnionExec",
+            // Union doesn benefit from input partitioning - no parallelism
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+        ];
+
+        assert_optimized!(expected, plan, 2, true, 10);
+        Ok(())
+    }
+
+    #[test]
+    fn parallelization_prior_to_sort_preserving_merge() -> Result<()> {
+        // sort preserving merge already sorted input,
+        let plan = sort_preserving_merge_exec(parquet_exec_sorted());
+
+        // parallelization potentially could break sort order
+        let expected = &[
+            "SortPreservingMergeExec: [c1@0 ASC]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
+        ];
+
+        assert_optimized!(expected, plan, 2, true, 10);
+        Ok(())
+    }
+
+    #[test]
+    fn parallelization_sort_preserving_merge_with_union() -> Result<()> {
+        // 2 sorted parquet files unioned (partitions are concatenated, sort is preserved)
+        let input = union_exec(vec![parquet_exec_sorted(); 2]);
+        let plan = sort_preserving_merge_exec(input);
+
+        // should not repartition / sort (as the data was already sorted)
+        let expected = &[
+            "SortPreservingMergeExec: [c1@0 ASC]",
+            "UnionExec",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
+        ];
+
+        assert_optimized!(expected, plan, 2, true, 10);
+        Ok(())
+    }
+
+    #[test]
+    fn parallelization_does_not_destroy_sort() -> Result<()> {
+        //  SortRequired
+        //    Parquet(sorted)
+
+        let plan = sort_required_exec(parquet_exec_sorted());
+
+        // no parallelization to preserve sort order
+        let expected = &[
+            "SortRequiredExec",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
+        ];
+
+        assert_optimized!(expected, plan, 2, true, 10);
+        Ok(())
+    }
+
+    #[test]
+    fn parallelization_ignores_transitively_with_projection() -> Result<()> {
+        // sorted input
+        let plan = sort_preserving_merge_exec(projection_exec(parquet_exec_sorted()));
+
+        // data should not be repartitioned / resorted
+        let expected = &[
+            "SortPreservingMergeExec: [c1@0 ASC]",
+            "ProjectionExec: expr=[c1@0 as c1]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
+        ];
+
+        assert_optimized!(expected, plan, 2, true, 10);
+        Ok(())
+    }
+
     /// Models operators like BoundedWindowExec that require an input
     /// ordering but is easy to construct
     #[derive(Debug)]
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs
index 8f2bae744..78828cbf8 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -207,6 +207,10 @@ impl<'a> Display for FileGroupsDisplay<'a> {
                 first_file = false;
 
                 write!(f, "{}", pf.object_meta.location.as_ref())?;
+
+                if let Some(range) = pf.range.as_ref() {
+                    write!(f, ":{}..{}", range.start, range.end)?;
+                }
             }
             write!(f, "]")?;
         }
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 5e81602fe..d72da7574 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -20,6 +20,7 @@
 use arrow::datatypes::{DataType, SchemaRef};
 use fmt::Debug;
 use std::any::Any;
+use std::cmp::min;
 use std::fmt;
 use std::fs;
 use std::ops::Range;
@@ -32,6 +33,7 @@ use crate::physical_plan::file_format::file_stream::{
 };
 use crate::physical_plan::file_format::FileMeta;
 use crate::{
+    datasource::listing::FileRange,
     error::{DataFusionError, Result},
     execution::context::{SessionState, TaskContext},
     physical_optimizer::pruning::PruningPredicate,
@@ -48,6 +50,7 @@ use bytes::Bytes;
 use datafusion_expr::Expr;
 use futures::future::BoxFuture;
 use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
+use itertools::Itertools;
 use log::debug;
 use object_store::{ObjectMeta, ObjectStore};
 use parquet::arrow::arrow_reader::ArrowReaderOptions;
@@ -68,6 +71,12 @@ pub use metrics::ParquetFileMetrics;
 
 use super::get_output_ordering;
 
+#[derive(Default)]
+struct RepartitionState {
+    current_partition_index: usize,
+    current_partition_size: usize,
+}
+
 /// Execution plan for scanning one or more Parquet partitions
 #[derive(Debug, Clone)]
 pub struct ParquetExec {
@@ -232,6 +241,78 @@ impl ParquetExec {
         self.enable_page_index
             .unwrap_or(config_options.execution.parquet.enable_page_index)
     }
+
+    /// Redistribute files across partitions according to their size
+    pub fn get_repartitioned(
+        &self,
+        target_partitions: usize,
+        repartition_file_min_size: usize,
+    ) -> Self {
+        let flattened_files = self
+            .base_config()
+            .file_groups
+            .iter()
+            .flatten()
+            .collect::<Vec<_>>();
+
+        // Perform redistribution only in case all files should be read from beginning to end
+        let has_ranges = flattened_files.iter().any(|f| f.range.is_some());
+        if has_ranges {
+            return self.clone();
+        }
+
+        let total_size = flattened_files
+            .iter()
+            .map(|f| f.object_meta.size as i64)
+            .sum::<i64>();
+        if total_size < (repartition_file_min_size as i64) {
+            return self.clone();
+        }
+
+        let target_partition_size =
+            (total_size as usize + (target_partitions) - 1) / (target_partitions);
+
+        let repartitioned_files = flattened_files
+            .into_iter()
+            .scan(RepartitionState::default(), |state, source_file| {
+                let mut produced_files = vec![];
+                let mut range_start = 0;
+                while range_start < source_file.object_meta.size {
+                    let range_end = min(
+                        range_start
+                            + (target_partition_size - state.current_partition_size),
+                        source_file.object_meta.size,
+                    );
+
+                    let mut produced_file = source_file.clone();
+                    produced_file.range = Some(FileRange {
+                        start: range_start as i64,
+                        end: range_end as i64,
+                    });
+                    produced_files.push((state.current_partition_index, produced_file));
+
+                    if state.current_partition_size + (range_end - range_start)
+                        >= target_partition_size
+                    {
+                        state.current_partition_index += 1;
+                        state.current_partition_size = 0;
+                    } else {
+                        state.current_partition_size += range_end - range_start;
+                    }
+                    range_start = range_end;
+                }
+                Some(produced_files)
+            })
+            .flatten()
+            .group_by(|(partition_idx, _)| *partition_idx)
+            .into_iter()
+            .map(|(_, group)| group.map(|(_, vals)| vals).collect_vec())
+            .collect_vec();
+
+        let mut new_parquet_exec = self.clone();
+        new_parquet_exec.base_config.file_groups = repartitioned_files;
+        new_parquet_exec
+    }
 }
 
 impl ExecutionPlan for ParquetExec {
@@ -1646,6 +1727,242 @@ mod tests {
         assert_eq!(predicate.unwrap().as_ref(), &filter);
     }
 
+    #[tokio::test]
+    async fn parquet_exec_repartition_single_file() {
+        // Single file, single partition into multiple partitions
+        let partitioned_file = PartitionedFile::new("a".to_string(), 123);
+        let single_partition = vec![vec![partitioned_file]];
+        let parquet_exec = ParquetExec::new(
+            FileScanConfig {
+                object_store_url: ObjectStoreUrl::local_filesystem(),
+                file_groups: single_partition,
+                file_schema: Arc::new(Schema::empty()),
+                statistics: Statistics::default(),
+                projection: None,
+                limit: None,
+                table_partition_cols: vec![],
+                output_ordering: None,
+                infinite_source: false,
+            },
+            None,
+            None,
+        );
+
+        let actual = file_groups_to_vec(
+            parquet_exec
+                .get_repartitioned(4, 10)
+                .base_config()
+                .file_groups
+                .clone(),
+        );
+        let expected = vec![
+            (0, "a".to_string(), 0, 31),
+            (1, "a".to_string(), 31, 62),
+            (2, "a".to_string(), 62, 93),
+            (3, "a".to_string(), 93, 123),
+        ];
+        assert_eq!(expected, actual);
+    }
+
+    #[tokio::test]
+    async fn parquet_exec_repartition_too_much_partitions() {
+        // Single file, single parittion into 96 partitions
+        let partitioned_file = PartitionedFile::new("a".to_string(), 8);
+        let single_partition = vec![vec![partitioned_file]];
+        let parquet_exec = ParquetExec::new(
+            FileScanConfig {
+                object_store_url: ObjectStoreUrl::local_filesystem(),
+                file_groups: single_partition,
+                file_schema: Arc::new(Schema::empty()),
+                statistics: Statistics::default(),
+                projection: None,
+                limit: None,
+                table_partition_cols: vec![],
+                output_ordering: None,
+                infinite_source: false,
+            },
+            None,
+            None,
+        );
+
+        let actual = file_groups_to_vec(
+            parquet_exec
+                .get_repartitioned(96, 5)
+                .base_config()
+                .file_groups
+                .clone(),
+        );
+        let expected = vec![
+            (0, "a".to_string(), 0, 1),
+            (1, "a".to_string(), 1, 2),
+            (2, "a".to_string(), 2, 3),
+            (3, "a".to_string(), 3, 4),
+            (4, "a".to_string(), 4, 5),
+            (5, "a".to_string(), 5, 6),
+            (6, "a".to_string(), 6, 7),
+            (7, "a".to_string(), 7, 8),
+        ];
+        assert_eq!(expected, actual);
+    }
+
+    #[tokio::test]
+    async fn parquet_exec_repartition_multiple_partitions() {
+        // Multiple files in single partition after redistribution
+        let partitioned_file_1 = PartitionedFile::new("a".to_string(), 40);
+        let partitioned_file_2 = PartitionedFile::new("b".to_string(), 60);
+        let source_partitions = vec![vec![partitioned_file_1], vec![partitioned_file_2]];
+        let parquet_exec = ParquetExec::new(
+            FileScanConfig {
+                object_store_url: ObjectStoreUrl::local_filesystem(),
+                file_groups: source_partitions,
+                file_schema: Arc::new(Schema::empty()),
+                statistics: Statistics::default(),
+                projection: None,
+                limit: None,
+                table_partition_cols: vec![],
+                output_ordering: None,
+                infinite_source: false,
+            },
+            None,
+            None,
+        );
+
+        let actual = file_groups_to_vec(
+            parquet_exec
+                .get_repartitioned(3, 10)
+                .base_config()
+                .file_groups
+                .clone(),
+        );
+        let expected = vec![
+            (0, "a".to_string(), 0, 34),
+            (1, "a".to_string(), 34, 40),
+            (1, "b".to_string(), 0, 28),
+            (2, "b".to_string(), 28, 60),
+        ];
+        assert_eq!(expected, actual);
+    }
+
+    #[tokio::test]
+    async fn parquet_exec_repartition_same_num_partitions() {
+        // "Rebalance" files across partitions
+        let partitioned_file_1 = PartitionedFile::new("a".to_string(), 40);
+        let partitioned_file_2 = PartitionedFile::new("b".to_string(), 60);
+        let source_partitions = vec![vec![partitioned_file_1], vec![partitioned_file_2]];
+        let parquet_exec = ParquetExec::new(
+            FileScanConfig {
+                object_store_url: ObjectStoreUrl::local_filesystem(),
+                file_groups: source_partitions,
+                file_schema: Arc::new(Schema::empty()),
+                statistics: Statistics::default(),
+                projection: None,
+                limit: None,
+                table_partition_cols: vec![],
+                output_ordering: None,
+                infinite_source: false,
+            },
+            None,
+            None,
+        );
+
+        let actual = file_groups_to_vec(
+            parquet_exec
+                .get_repartitioned(2, 10)
+                .base_config()
+                .file_groups
+                .clone(),
+        );
+        let expected = vec![
+            (0, "a".to_string(), 0, 40),
+            (0, "b".to_string(), 0, 10),
+            (1, "b".to_string(), 10, 60),
+        ];
+        assert_eq!(expected, actual);
+    }
+
+    #[tokio::test]
+    async fn parquet_exec_repartition_no_action_ranges() {
+        // No action due to Some(range) in second file
+        let partitioned_file_1 = PartitionedFile::new("a".to_string(), 123);
+        let mut partitioned_file_2 = PartitionedFile::new("b".to_string(), 144);
+        partitioned_file_2.range = Some(FileRange { start: 1, end: 50 });
+
+        let source_partitions = vec![vec![partitioned_file_1], vec![partitioned_file_2]];
+        let parquet_exec = ParquetExec::new(
+            FileScanConfig {
+                object_store_url: ObjectStoreUrl::local_filesystem(),
+                file_groups: source_partitions,
+                file_schema: Arc::new(Schema::empty()),
+                statistics: Statistics::default(),
+                projection: None,
+                limit: None,
+                table_partition_cols: vec![],
+                output_ordering: None,
+                infinite_source: false,
+            },
+            None,
+            None,
+        );
+
+        let actual = parquet_exec
+            .get_repartitioned(65, 10)
+            .base_config()
+            .file_groups
+            .clone();
+        assert_eq!(2, actual.len());
+    }
+
+    #[tokio::test]
+    async fn parquet_exec_repartition_no_action_min_size() {
+        // No action due to target_partition_size
+        let partitioned_file = PartitionedFile::new("a".to_string(), 123);
+        let single_partition = vec![vec![partitioned_file]];
+        let parquet_exec = ParquetExec::new(
+            FileScanConfig {
+                object_store_url: ObjectStoreUrl::local_filesystem(),
+                file_groups: single_partition,
+                file_schema: Arc::new(Schema::empty()),
+                statistics: Statistics::default(),
+                projection: None,
+                limit: None,
+                table_partition_cols: vec![],
+                output_ordering: None,
+                infinite_source: false,
+            },
+            None,
+            None,
+        );
+
+        let actual = parquet_exec
+            .get_repartitioned(65, 500)
+            .base_config()
+            .file_groups
+            .clone();
+        assert_eq!(1, actual.len());
+    }
+
+    fn file_groups_to_vec(
+        file_groups: Vec<Vec<PartitionedFile>>,
+    ) -> Vec<(usize, String, i64, i64)> {
+        file_groups
+            .iter()
+            .enumerate()
+            .flat_map(|(part_idx, files)| {
+                files
+                    .iter()
+                    .map(|f| {
+                        (
+                            part_idx,
+                            f.object_meta.location.to_string(),
+                            f.range.as_ref().unwrap().start,
+                            f.range.as_ref().unwrap().end,
+                        )
+                    })
+                    .collect_vec()
+            })
+            .collect_vec()
+    }
+
     /// returns the sum of all the metrics with the specified name
     /// the returned set.
     ///
diff --git a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
index b1e8733c8..bd796ed71 100644
--- a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
@@ -131,6 +131,8 @@ datafusion.optimizer.hash_join_single_partition_threshold 1048576
 datafusion.optimizer.max_passes 3
 datafusion.optimizer.prefer_hash_join true
 datafusion.optimizer.repartition_aggregations true
+datafusion.optimizer.repartition_file_min_size 10485760
+datafusion.optimizer.repartition_file_scans false
 datafusion.optimizer.repartition_joins true
 datafusion.optimizer.repartition_windows true
 datafusion.optimizer.skip_failed_rules true
diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md
index 7f502f613..cb8478f44 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -35,35 +35,37 @@ Values are parsed according to the [same rules used in casts from Utf8](https://
 If the value in the environment variable cannot be cast to the type of the configuration option, the default value will be used instead and a warning emitted.
 Environment variables are read during `SessionConfig` initialisation so they must be set beforehand and will not affect running sessions.
 
-| key                                                       | default    | description                                                                                                                                                                                                                                                                                                |
-| --------------------------------------------------------- | ---------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
-| datafusion.catalog.create_default_catalog_and_schema      | true       | Whether the default catalog and schema should be created automatically.                                                                                                                                                                                                                                    |
-| datafusion.catalog.default_catalog                        | datafusion | The default catalog name - this impacts what SQL queries use if not specified                                                                                                                                                                                                                              |
-| datafusion.catalog.default_schema                         | public     | The default schema name - this impacts what SQL queries use if not specified                                                                                                                                                                                                                               |
-| datafusion.catalog.information_schema                     | false      | Should DataFusion provide access to `information_schema` virtual tables for displaying schema information                                                                                                                                                                                                  |
-| datafusion.catalog.location                               | NULL       | Location scanned to load tables for `default` schema                                                                                                                                                                                                                                                       |
-| datafusion.catalog.format                                 | NULL       | Type of `TableProvider` to use when loading `default` schema                                                                                                                                                                                                                                               |
-| datafusion.catalog.has_header                             | false      | If the file has a header                                                                                                                                                                                                                                                                                   |
-| datafusion.execution.batch_size                           | 8192       | Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would results in too much metadata memory consumption                                                                                                                       |
-| datafusion.execution.coalesce_batches                     | true       | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting |
-| datafusion.execution.collect_statistics                   | false      | Should DataFusion collect statistics after listing files                                                                                                                                                                                                                                                   |
-| datafusion.execution.target_partitions                    | 0          | Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of cpu cores on the system                                                                                                                                                                |
-| datafusion.execution.time_zone                            | +00:00     | The default time zone Some functions, e.g. EXTRACT(HOUR from SOME_TIME), shift the underlying datetime according to this time zone, and then extract the hour                                                                                                                                              |
-| datafusion.execution.parquet.enable_page_index            | false      | If true, uses parquet data page level metadata (Page Index) statistics to reduce the number of rows decoded.                                                                                                                                                                                               |
-| datafusion.execution.parquet.pruning                      | true       | If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file                                                                                                                                            |
-| datafusion.execution.parquet.skip_metadata                | true       | If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata                                                          |
-| datafusion.execution.parquet.metadata_size_hint           | NULL       | If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two read are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer                                     |
-| datafusion.execution.parquet.pushdown_filters             | false      | If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded                                                                                                                                                                                      |
-| datafusion.execution.parquet.reorder_filters              | false      | If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query                                                                                |
-| datafusion.optimizer.enable_round_robin_repartition       | true       | When set to true, the physical plan optimizer will try to add round robin repartition to increase parallelism to leverage more CPU cores                                                                                                                                                                   |
-| datafusion.optimizer.filter_null_join_keys                | false      | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down.                                            |
-| datafusion.optimizer.repartition_aggregations             | true       | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level"                                                                                                                                                                |
-| datafusion.optimizer.repartition_joins                    | true       | Should DataFusion repartition data using the join keys to execute joins in parallel using the provided `target_partitions` level"                                                                                                                                                                          |
-| datafusion.optimizer.repartition_windows                  | true       | Should DataFusion repartition data using the partitions keys to execute window functions in parallel using the provided `target_partitions` level"                                                                                                                                                         |
-| datafusion.optimizer.skip_failed_rules                    | true       | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail                                                                       |
-| datafusion.optimizer.max_passes                           | 3          | Number of times that the optimizer will attempt to optimize the plan                                                                                                                                                                                                                                       |
-| datafusion.optimizer.top_down_join_key_reordering         | true       | When set to true, the physical plan optimizer will run a top down process to reorder the join keys                                                                                                                                                                                                         |
-| datafusion.optimizer.prefer_hash_join                     | true       | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory                                                                                                                                      |
-| datafusion.optimizer.hash_join_single_partition_threshold | 1048576    | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition                                                                                                                                                                                             |
-| datafusion.explain.logical_plan_only                      | false      | When set to true, the explain statement will only print logical plans                                                                                                                                                                                                                                      |
-| datafusion.explain.physical_plan_only                     | false      | When set to true, the explain statement will only print physical plans                                                                                                                                                                                                                                     |
+| key                                                       | default    | description                                                                                                                                                                                                                                                                                                   |
+| --------------------------------------------------------- | ---------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
+| datafusion.catalog.create_default_catalog_and_schema      | true       | Whether the default catalog and schema should be created automatically.                                                                                                                                                                                                                                       |
+| datafusion.catalog.default_catalog                        | datafusion | The default catalog name - this impacts what SQL queries use if not specified                                                                                                                                                                                                                                 |
+| datafusion.catalog.default_schema                         | public     | The default schema name - this impacts what SQL queries use if not specified                                                                                                                                                                                                                                  |
+| datafusion.catalog.information_schema                     | false      | Should DataFusion provide access to `information_schema` virtual tables for displaying schema information                                                                                                                                                                                                     |
+| datafusion.catalog.location                               | NULL       | Location scanned to load tables for `default` schema                                                                                                                                                                                                                                                          |
+| datafusion.catalog.format                                 | NULL       | Type of `TableProvider` to use when loading `default` schema                                                                                                                                                                                                                                                  |
+| datafusion.catalog.has_header                             | false      | If the file has a header                                                                                                                                                                                                                                                                                      |
+| datafusion.execution.batch_size                           | 8192       | Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would results in too much metadata memory consumption                                                                                                                          |
+| datafusion.execution.coalesce_batches                     | true       | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting    |
+| datafusion.execution.collect_statistics                   | false      | Should DataFusion collect statistics after listing files                                                                                                                                                                                                                                                      |
+| datafusion.execution.target_partitions                    | 0          | Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of cpu cores on the system                                                                                                                                                                   |
+| datafusion.execution.time_zone                            | +00:00     | The default time zone Some functions, e.g. EXTRACT(HOUR from SOME_TIME), shift the underlying datetime according to this time zone, and then extract the hour                                                                                                                                                 |
+| datafusion.execution.parquet.enable_page_index            | false      | If true, uses parquet data page level metadata (Page Index) statistics to reduce the number of rows decoded.                                                                                                                                                                                                  |
+| datafusion.execution.parquet.pruning                      | true       | If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file                                                                                                                                               |
+| datafusion.execution.parquet.skip_metadata                | true       | If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata                                                             |
+| datafusion.execution.parquet.metadata_size_hint           | NULL       | If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two read are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer                                        |
+| datafusion.execution.parquet.pushdown_filters             | false      | If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded                                                                                                                                                                                         |
+| datafusion.execution.parquet.reorder_filters              | false      | If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query                                                                                   |
+| datafusion.optimizer.enable_round_robin_repartition       | true       | When set to true, the physical plan optimizer will try to add round robin repartition to increase parallelism to leverage more CPU cores                                                                                                                                                                      |
+| datafusion.optimizer.filter_null_join_keys                | false      | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down.                                               |
+| datafusion.optimizer.repartition_aggregations             | true       | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level"                                                                                                                                                                   |
+| datafusion.optimizer.repartition_file_min_size            | 10485760   | Minimum total files size in bytes to perform file scan repartitioning.                                                                                                                                                                                                                                        |
+| datafusion.optimizer.repartition_joins                    | true       | Should DataFusion repartition data using the join keys to execute joins in parallel using the provided `target_partitions` level"                                                                                                                                                                             |
+| datafusion.optimizer.repartition_file_scans               | false      | When set to true, file groups will be repartitioned to achieve maximum parallelism. Currently supported only for Parquet format in which case multiple row groups from the same file may be read concurrently. If false then each row group is read serially, though different files may be read in parallel. |
+| datafusion.optimizer.repartition_windows                  | true       | Should DataFusion repartition data using the partitions keys to execute window functions in parallel using the provided `target_partitions` level"                                                                                                                                                            |
+| datafusion.optimizer.skip_failed_rules                    | true       | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail                                                                          |
+| datafusion.optimizer.max_passes                           | 3          | Number of times that the optimizer will attempt to optimize the plan                                                                                                                                                                                                                                          |
+| datafusion.optimizer.top_down_join_key_reordering         | true       | When set to true, the physical plan optimizer will run a top down process to reorder the join keys                                                                                                                                                                                                            |
+| datafusion.optimizer.prefer_hash_join                     | true       | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory                                                                                                                                         |
+| datafusion.optimizer.hash_join_single_partition_threshold | 1048576    | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition                                                                                                                                                                                                |
+| datafusion.explain.logical_plan_only                      | false      | When set to true, the explain statement will only print logical plans                                                                                                                                                                                                                                         |
+| datafusion.explain.physical_plan_only                     | false      | When set to true, the explain statement will only print physical plans                                                                                                                                                                                                                                        |