You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/12/28 19:11:12 UTC

[GitHub] [arrow] andygrove commented on a change in pull request #9029: ARROW-10995: [Rust] [DataFusion] Limit ParquetExec concurrency when reading large number of files

andygrove commented on a change in pull request #9029:
URL: https://github.com/apache/arrow/pull/9029#discussion_r549456810



##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -104,35 +105,41 @@ impl ParquetExec {
         filenames: &[&str],
         projection: Option<Vec<usize>>,
         batch_size: usize,
+        max_concurrency: usize,
     ) -> Result<Self> {
         // build a list of Parquet partitions with statistics and gather all unique schemas
         // used in this data set
         let mut schemas: Vec<Schema> = vec![];
-        let mut partitions = Vec::with_capacity(filenames.len());
-        for filename in filenames {
-            let file = File::open(filename)?;
-            let file_reader = Arc::new(SerializedFileReader::new(file)?);
-            let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
-            let meta_data = arrow_reader.get_metadata();
-            // collect all the unique schemas in this data set
-            let schema = arrow_reader.get_schema()?;
-            if schemas.is_empty() || schema != schemas[0] {
-                schemas.push(schema);
-            }
-            let mut num_rows = 0;
-            let mut total_byte_size = 0;
-            for i in 0..meta_data.num_row_groups() {
-                let row_group_meta = meta_data.row_group(i);
-                num_rows += row_group_meta.num_rows();
-                total_byte_size += row_group_meta.total_byte_size();
+        let mut partitions = Vec::with_capacity(max_concurrency);
+        let filenames: Vec<String> = filenames.iter().map(|s| s.to_string()).collect();
+        let chunks = split_files(&filenames, max_concurrency);

Review comment:
       It is true, we could sort the list by row count first and distribute the chunks more evenly. I will look at doing that. 
   
   Unfortunately we can't use tokio for these tasks (as far as I know) and we have to use threads because the parquet crate does not support async so this is quite heavweight if we launch too many threads. Also, it can be problematic if we try and open thousands of files at the same time since we can hit file handle limits, so it is important to have some limit on concurrency.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org