You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/12/28 16:27:40 UTC

[GitHub] [arrow] andygrove opened a new pull request #9029: ARROW-10995: Limit ParquetExec concurrency when reading large number of files

andygrove opened a new pull request #9029:
URL: https://github.com/apache/arrow/pull/9029


   Before this PR, we were launching one thread per Parquet file in ParquetExec. This works well when the number of files is small but does not scale well when there are hundreds of files. This PR respects the context's `max_concurrency` configuration option and uses it to split the files up into lists per partition/thread.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #9029: ARROW-10995: [Rust] [DataFusion] Limit ParquetExec concurrency when reading large number of files

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #9029:
URL: https://github.com/apache/arrow/pull/9029#discussion_r549523454



##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -104,35 +105,41 @@ impl ParquetExec {
         filenames: &[&str],
         projection: Option<Vec<usize>>,
         batch_size: usize,
+        max_concurrency: usize,
     ) -> Result<Self> {
         // build a list of Parquet partitions with statistics and gather all unique schemas
         // used in this data set
         let mut schemas: Vec<Schema> = vec![];
-        let mut partitions = Vec::with_capacity(filenames.len());
-        for filename in filenames {
-            let file = File::open(filename)?;
-            let file_reader = Arc::new(SerializedFileReader::new(file)?);
-            let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
-            let meta_data = arrow_reader.get_metadata();
-            // collect all the unique schemas in this data set
-            let schema = arrow_reader.get_schema()?;
-            if schemas.is_empty() || schema != schemas[0] {
-                schemas.push(schema);
-            }
-            let mut num_rows = 0;
-            let mut total_byte_size = 0;
-            for i in 0..meta_data.num_row_groups() {
-                let row_group_meta = meta_data.row_group(i);
-                num_rows += row_group_meta.num_rows();
-                total_byte_size += row_group_meta.total_byte_size();
+        let mut partitions = Vec::with_capacity(max_concurrency);
+        let filenames: Vec<String> = filenames.iter().map(|s| s.to_string()).collect();
+        let chunks = split_files(&filenames, max_concurrency);

Review comment:
       Thanks. I filed https://issues.apache.org/jira/browse/ARROW-11056 for splitting on row groups. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #9029: ARROW-10995: [Rust] [DataFusion] Limit ParquetExec concurrency when reading large number of files

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #9029:
URL: https://github.com/apache/arrow/pull/9029#discussion_r549435152



##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -104,35 +105,41 @@ impl ParquetExec {
         filenames: &[&str],
         projection: Option<Vec<usize>>,
         batch_size: usize,
+        max_concurrency: usize,
     ) -> Result<Self> {
         // build a list of Parquet partitions with statistics and gather all unique schemas
         // used in this data set
         let mut schemas: Vec<Schema> = vec![];
-        let mut partitions = Vec::with_capacity(filenames.len());
-        for filename in filenames {
-            let file = File::open(filename)?;
-            let file_reader = Arc::new(SerializedFileReader::new(file)?);
-            let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
-            let meta_data = arrow_reader.get_metadata();
-            // collect all the unique schemas in this data set
-            let schema = arrow_reader.get_schema()?;
-            if schemas.is_empty() || schema != schemas[0] {
-                schemas.push(schema);
-            }
-            let mut num_rows = 0;
-            let mut total_byte_size = 0;
-            for i in 0..meta_data.num_row_groups() {
-                let row_group_meta = meta_data.row_group(i);
-                num_rows += row_group_meta.num_rows();
-                total_byte_size += row_group_meta.total_byte_size();
+        let mut partitions = Vec::with_capacity(max_concurrency);
+        let filenames: Vec<String> = filenames.iter().map(|s| s.to_string()).collect();
+        let chunks = split_files(&filenames, max_concurrency);

Review comment:
       What happens if certain chunks / combination of file names are much bigger than others? I guess then it will not finish as fast as it could if it could use all the threads?
   I don't know much about this part of the code base, so maybe it doesn't make sense, but I think the ideal code would be keeping it finegrained on a file level (or even smaller if possible / beneficial) and having a (configured) runtime divide the work?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #9029: ARROW-10995: [Rust] [DataFusion] Limit ParquetExec concurrency when reading large number of files

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #9029:
URL: https://github.com/apache/arrow/pull/9029#discussion_r549456810



##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -104,35 +105,41 @@ impl ParquetExec {
         filenames: &[&str],
         projection: Option<Vec<usize>>,
         batch_size: usize,
+        max_concurrency: usize,
     ) -> Result<Self> {
         // build a list of Parquet partitions with statistics and gather all unique schemas
         // used in this data set
         let mut schemas: Vec<Schema> = vec![];
-        let mut partitions = Vec::with_capacity(filenames.len());
-        for filename in filenames {
-            let file = File::open(filename)?;
-            let file_reader = Arc::new(SerializedFileReader::new(file)?);
-            let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
-            let meta_data = arrow_reader.get_metadata();
-            // collect all the unique schemas in this data set
-            let schema = arrow_reader.get_schema()?;
-            if schemas.is_empty() || schema != schemas[0] {
-                schemas.push(schema);
-            }
-            let mut num_rows = 0;
-            let mut total_byte_size = 0;
-            for i in 0..meta_data.num_row_groups() {
-                let row_group_meta = meta_data.row_group(i);
-                num_rows += row_group_meta.num_rows();
-                total_byte_size += row_group_meta.total_byte_size();
+        let mut partitions = Vec::with_capacity(max_concurrency);
+        let filenames: Vec<String> = filenames.iter().map(|s| s.to_string()).collect();
+        let chunks = split_files(&filenames, max_concurrency);

Review comment:
       It is true, we could sort the list by row count first and distribute the chunks more evenly. I will look at doing that. 
   
   Unfortunately we can't use tokio for these tasks (as far as I know) and we have to use threads because the parquet crate does not support async so this is quite heavweight if we launch too many threads. Also, it can be problematic if we try and open thousands of files at the same time since we can hit file handle limits, so it is important to have some limit on concurrency.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #9029: ARROW-10995: [Rust] [DataFusion] Limit ParquetExec concurrency when reading large number of files

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #9029:
URL: https://github.com/apache/arrow/pull/9029#discussion_r549473140



##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -104,35 +105,41 @@ impl ParquetExec {
         filenames: &[&str],
         projection: Option<Vec<usize>>,
         batch_size: usize,
+        max_concurrency: usize,
     ) -> Result<Self> {
         // build a list of Parquet partitions with statistics and gather all unique schemas
         // used in this data set
         let mut schemas: Vec<Schema> = vec![];
-        let mut partitions = Vec::with_capacity(filenames.len());
-        for filename in filenames {
-            let file = File::open(filename)?;
-            let file_reader = Arc::new(SerializedFileReader::new(file)?);
-            let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
-            let meta_data = arrow_reader.get_metadata();
-            // collect all the unique schemas in this data set
-            let schema = arrow_reader.get_schema()?;
-            if schemas.is_empty() || schema != schemas[0] {
-                schemas.push(schema);
-            }
-            let mut num_rows = 0;
-            let mut total_byte_size = 0;
-            for i in 0..meta_data.num_row_groups() {
-                let row_group_meta = meta_data.row_group(i);
-                num_rows += row_group_meta.num_rows();
-                total_byte_size += row_group_meta.total_byte_size();
+        let mut partitions = Vec::with_capacity(max_concurrency);
+        let filenames: Vec<String> = filenames.iter().map(|s| s.to_string()).collect();
+        let chunks = split_files(&filenames, max_concurrency);
+        let mut num_rows = 0;
+        let mut total_byte_size = 0;
+        for chunk in chunks {

Review comment:
       another strategy you might consider that might be less susceptible to long-tail splits would be to put all the filenames into a channel (or other sort of queue) and have each of the threads read from the queue when they were done with the previous work item. In that way you wouldn't have to try and balance the work up front




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #9029: ARROW-10995: [Rust] [DataFusion] Limit ParquetExec concurrency when reading large number of files

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #9029:
URL: https://github.com/apache/arrow/pull/9029#discussion_r549561500



##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -104,35 +105,41 @@ impl ParquetExec {
         filenames: &[&str],
         projection: Option<Vec<usize>>,
         batch_size: usize,
+        max_concurrency: usize,
     ) -> Result<Self> {
         // build a list of Parquet partitions with statistics and gather all unique schemas
         // used in this data set
         let mut schemas: Vec<Schema> = vec![];
-        let mut partitions = Vec::with_capacity(filenames.len());
-        for filename in filenames {
-            let file = File::open(filename)?;
-            let file_reader = Arc::new(SerializedFileReader::new(file)?);
-            let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
-            let meta_data = arrow_reader.get_metadata();
-            // collect all the unique schemas in this data set
-            let schema = arrow_reader.get_schema()?;
-            if schemas.is_empty() || schema != schemas[0] {
-                schemas.push(schema);
-            }
-            let mut num_rows = 0;
-            let mut total_byte_size = 0;
-            for i in 0..meta_data.num_row_groups() {
-                let row_group_meta = meta_data.row_group(i);
-                num_rows += row_group_meta.num_rows();
-                total_byte_size += row_group_meta.total_byte_size();
+        let mut partitions = Vec::with_capacity(max_concurrency);
+        let filenames: Vec<String> = filenames.iter().map(|s| s.to_string()).collect();
+        let chunks = split_files(&filenames, max_concurrency);

Review comment:
       Makes sense, the change to limit the nr. of threads is useful. I think the idea of @alamb to send it over a channel makes sense to me.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #9029: ARROW-10995: [Rust] [DataFusion] Limit ParquetExec concurrency when reading large number of files

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #9029:
URL: https://github.com/apache/arrow/pull/9029#discussion_r549561500



##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -104,35 +105,41 @@ impl ParquetExec {
         filenames: &[&str],
         projection: Option<Vec<usize>>,
         batch_size: usize,
+        max_concurrency: usize,
     ) -> Result<Self> {
         // build a list of Parquet partitions with statistics and gather all unique schemas
         // used in this data set
         let mut schemas: Vec<Schema> = vec![];
-        let mut partitions = Vec::with_capacity(filenames.len());
-        for filename in filenames {
-            let file = File::open(filename)?;
-            let file_reader = Arc::new(SerializedFileReader::new(file)?);
-            let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
-            let meta_data = arrow_reader.get_metadata();
-            // collect all the unique schemas in this data set
-            let schema = arrow_reader.get_schema()?;
-            if schemas.is_empty() || schema != schemas[0] {
-                schemas.push(schema);
-            }
-            let mut num_rows = 0;
-            let mut total_byte_size = 0;
-            for i in 0..meta_data.num_row_groups() {
-                let row_group_meta = meta_data.row_group(i);
-                num_rows += row_group_meta.num_rows();
-                total_byte_size += row_group_meta.total_byte_size();
+        let mut partitions = Vec::with_capacity(max_concurrency);
+        let filenames: Vec<String> = filenames.iter().map(|s| s.to_string()).collect();
+        let chunks = split_files(&filenames, max_concurrency);

Review comment:
       Makes sense, the change to limit the nr. of threads is useful already. I think the idea of @alamb to send it over a channel makes sense to me.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #9029: ARROW-10995: [Rust] [DataFusion] Limit ParquetExec concurrency when reading large number of files

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #9029:
URL: https://github.com/apache/arrow/pull/9029#discussion_r549561500



##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -104,35 +105,41 @@ impl ParquetExec {
         filenames: &[&str],
         projection: Option<Vec<usize>>,
         batch_size: usize,
+        max_concurrency: usize,
     ) -> Result<Self> {
         // build a list of Parquet partitions with statistics and gather all unique schemas
         // used in this data set
         let mut schemas: Vec<Schema> = vec![];
-        let mut partitions = Vec::with_capacity(filenames.len());
-        for filename in filenames {
-            let file = File::open(filename)?;
-            let file_reader = Arc::new(SerializedFileReader::new(file)?);
-            let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
-            let meta_data = arrow_reader.get_metadata();
-            // collect all the unique schemas in this data set
-            let schema = arrow_reader.get_schema()?;
-            if schemas.is_empty() || schema != schemas[0] {
-                schemas.push(schema);
-            }
-            let mut num_rows = 0;
-            let mut total_byte_size = 0;
-            for i in 0..meta_data.num_row_groups() {
-                let row_group_meta = meta_data.row_group(i);
-                num_rows += row_group_meta.num_rows();
-                total_byte_size += row_group_meta.total_byte_size();
+        let mut partitions = Vec::with_capacity(max_concurrency);
+        let filenames: Vec<String> = filenames.iter().map(|s| s.to_string()).collect();
+        let chunks = split_files(&filenames, max_concurrency);

Review comment:
       Makes sense, the change to limit the nr. of threads is useful already. The idea of @alamb to send the work items over a channel makes sense to me.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #9029: ARROW-10995: [Rust] [DataFusion] Limit ParquetExec concurrency when reading large number of files

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #9029:
URL: https://github.com/apache/arrow/pull/9029#discussion_r549561500



##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -104,35 +105,41 @@ impl ParquetExec {
         filenames: &[&str],
         projection: Option<Vec<usize>>,
         batch_size: usize,
+        max_concurrency: usize,
     ) -> Result<Self> {
         // build a list of Parquet partitions with statistics and gather all unique schemas
         // used in this data set
         let mut schemas: Vec<Schema> = vec![];
-        let mut partitions = Vec::with_capacity(filenames.len());
-        for filename in filenames {
-            let file = File::open(filename)?;
-            let file_reader = Arc::new(SerializedFileReader::new(file)?);
-            let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
-            let meta_data = arrow_reader.get_metadata();
-            // collect all the unique schemas in this data set
-            let schema = arrow_reader.get_schema()?;
-            if schemas.is_empty() || schema != schemas[0] {
-                schemas.push(schema);
-            }
-            let mut num_rows = 0;
-            let mut total_byte_size = 0;
-            for i in 0..meta_data.num_row_groups() {
-                let row_group_meta = meta_data.row_group(i);
-                num_rows += row_group_meta.num_rows();
-                total_byte_size += row_group_meta.total_byte_size();
+        let mut partitions = Vec::with_capacity(max_concurrency);
+        let filenames: Vec<String> = filenames.iter().map(|s| s.to_string()).collect();
+        let chunks = split_files(&filenames, max_concurrency);

Review comment:
       Makes sense, the change to limit the nr. of threads is useful already. The idea of @alamb to send it over a channel makes sense to me.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #9029: ARROW-10995: [Rust] [DataFusion] Limit ParquetExec concurrency when reading large number of files

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #9029:
URL: https://github.com/apache/arrow/pull/9029#discussion_r549472740



##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -104,35 +105,41 @@ impl ParquetExec {
         filenames: &[&str],
         projection: Option<Vec<usize>>,
         batch_size: usize,
+        max_concurrency: usize,
     ) -> Result<Self> {
         // build a list of Parquet partitions with statistics and gather all unique schemas
         // used in this data set
         let mut schemas: Vec<Schema> = vec![];
-        let mut partitions = Vec::with_capacity(filenames.len());
-        for filename in filenames {
-            let file = File::open(filename)?;
-            let file_reader = Arc::new(SerializedFileReader::new(file)?);
-            let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
-            let meta_data = arrow_reader.get_metadata();
-            // collect all the unique schemas in this data set
-            let schema = arrow_reader.get_schema()?;
-            if schemas.is_empty() || schema != schemas[0] {
-                schemas.push(schema);
-            }
-            let mut num_rows = 0;
-            let mut total_byte_size = 0;
-            for i in 0..meta_data.num_row_groups() {
-                let row_group_meta = meta_data.row_group(i);
-                num_rows += row_group_meta.num_rows();
-                total_byte_size += row_group_meta.total_byte_size();
+        let mut partitions = Vec::with_capacity(max_concurrency);
+        let filenames: Vec<String> = filenames.iter().map(|s| s.to_string()).collect();
+        let chunks = split_files(&filenames, max_concurrency);

Review comment:
       I suggest filing a follow on ticket to try and split work across `ColumnChunks` rather than files -- i think starting with just files is reasonable 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #9029: ARROW-10995: [Rust] [DataFusion] Limit ParquetExec concurrency when reading large number of files

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #9029:
URL: https://github.com/apache/arrow/pull/9029#discussion_r549526242



##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -104,35 +105,41 @@ impl ParquetExec {
         filenames: &[&str],
         projection: Option<Vec<usize>>,
         batch_size: usize,
+        max_concurrency: usize,
     ) -> Result<Self> {
         // build a list of Parquet partitions with statistics and gather all unique schemas
         // used in this data set
         let mut schemas: Vec<Schema> = vec![];
-        let mut partitions = Vec::with_capacity(filenames.len());
-        for filename in filenames {
-            let file = File::open(filename)?;
-            let file_reader = Arc::new(SerializedFileReader::new(file)?);
-            let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
-            let meta_data = arrow_reader.get_metadata();
-            // collect all the unique schemas in this data set
-            let schema = arrow_reader.get_schema()?;
-            if schemas.is_empty() || schema != schemas[0] {
-                schemas.push(schema);
-            }
-            let mut num_rows = 0;
-            let mut total_byte_size = 0;
-            for i in 0..meta_data.num_row_groups() {
-                let row_group_meta = meta_data.row_group(i);
-                num_rows += row_group_meta.num_rows();
-                total_byte_size += row_group_meta.total_byte_size();
+        let mut partitions = Vec::with_capacity(max_concurrency);
+        let filenames: Vec<String> = filenames.iter().map(|s| s.to_string()).collect();
+        let chunks = split_files(&filenames, max_concurrency);
+        let mut num_rows = 0;
+        let mut total_byte_size = 0;
+        for chunk in chunks {

Review comment:
       I do like that idea. I'll give that some thought.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #9029: ARROW-10995: [Rust] [DataFusion] Limit ParquetExec concurrency when reading large number of files

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #9029:
URL: https://github.com/apache/arrow/pull/9029#issuecomment-751789556


   https://issues.apache.org/jira/browse/ARROW-10995


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove closed pull request #9029: ARROW-10995: [Rust] [DataFusion] Limit ParquetExec concurrency when reading large number of files

Posted by GitBox <gi...@apache.org>.
andygrove closed pull request #9029:
URL: https://github.com/apache/arrow/pull/9029


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #9029: ARROW-10995: [Rust] [DataFusion] Limit ParquetExec concurrency when reading large number of files

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #9029:
URL: https://github.com/apache/arrow/pull/9029#discussion_r549523560



##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -104,35 +105,41 @@ impl ParquetExec {
         filenames: &[&str],
         projection: Option<Vec<usize>>,
         batch_size: usize,
+        max_concurrency: usize,
     ) -> Result<Self> {
         // build a list of Parquet partitions with statistics and gather all unique schemas
         // used in this data set
         let mut schemas: Vec<Schema> = vec![];
-        let mut partitions = Vec::with_capacity(filenames.len());
-        for filename in filenames {
-            let file = File::open(filename)?;
-            let file_reader = Arc::new(SerializedFileReader::new(file)?);
-            let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
-            let meta_data = arrow_reader.get_metadata();
-            // collect all the unique schemas in this data set
-            let schema = arrow_reader.get_schema()?;
-            if schemas.is_empty() || schema != schemas[0] {
-                schemas.push(schema);
-            }
-            let mut num_rows = 0;
-            let mut total_byte_size = 0;
-            for i in 0..meta_data.num_row_groups() {
-                let row_group_meta = meta_data.row_group(i);
-                num_rows += row_group_meta.num_rows();
-                total_byte_size += row_group_meta.total_byte_size();
+        let mut partitions = Vec::with_capacity(max_concurrency);
+        let filenames: Vec<String> = filenames.iter().map(|s| s.to_string()).collect();
+        let chunks = split_files(&filenames, max_concurrency);

Review comment:
       @Dandandan I started looking at bin packing algorithms but I think it might be better to do this as a follow-up. What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org