You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/08/16 14:25:07 UTC

[GitHub] [arrow-datafusion] andygrove commented on a change in pull request #897: Refactor ParquetExec in preparation for implementing parallel scans for statistics

andygrove commented on a change in pull request #897:
URL: https://github.com/apache/arrow-datafusion/pull/897#discussion_r689584504



##########
File path: datafusion/src/physical_plan/parquet.rs
##########
@@ -167,235 +169,40 @@ impl ParquetExec {
                filenames, projection, predicate, limit);
         // build a list of Parquet partitions with statistics and gather all unique schemas
         // used in this data set
-        let mut schemas: Vec<Schema> = vec![];
+        let mut schemas: Vec<SchemaRef> = vec![];
         let mut partitions = Vec::with_capacity(max_concurrency);
         let filenames: Vec<String> = filenames.iter().map(|s| s.to_string()).collect();
         let chunks = split_files(&filenames, max_concurrency);
-        let mut num_rows = 0;
-        let mut num_fields = 0;
-        let mut fields = Vec::new();
-        let mut total_byte_size = 0;
-        let mut null_counts = Vec::new();
-        let mut max_values: Vec<Option<MaxAccumulator>> = Vec::new();
-        let mut min_values: Vec<Option<MinAccumulator>> = Vec::new();
-        let mut limit_exhausted = false;
-        for chunk in chunks {
-            let mut filenames: Vec<String> =
-                chunk.iter().map(|x| x.to_string()).collect();
-            let mut total_files = 0;
-            for filename in &filenames {
-                total_files += 1;
-                let file = File::open(filename)?;
-                let file_reader = Arc::new(SerializedFileReader::new(file)?);
-                let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
-                let meta_data = arrow_reader.get_metadata();
-                // collect all the unique schemas in this data set
-                let schema = arrow_reader.get_schema()?;
-                if schemas.is_empty() || schema != schemas[0] {
-                    fields = schema.fields().to_vec();
-                    num_fields = schema.fields().len();
-                    null_counts = vec![0; num_fields];
-                    max_values = schema
-                        .fields()
-                        .iter()
-                        .map(|field| MaxAccumulator::try_new(field.data_type()).ok())
-                        .collect::<Vec<_>>();
-                    min_values = schema
-                        .fields()
-                        .iter()
-                        .map(|field| MinAccumulator::try_new(field.data_type()).ok())
-                        .collect::<Vec<_>>();
-                    schemas.push(schema);
-                }
 
-                for row_group_meta in meta_data.row_groups() {
-                    num_rows += row_group_meta.num_rows();
-                    total_byte_size += row_group_meta.total_byte_size();
-
-                    // Currently assumes every Parquet file has same schema
-                    // https://issues.apache.org/jira/browse/ARROW-11017
-                    let columns_null_counts = row_group_meta
-                        .columns()
-                        .iter()
-                        .flat_map(|c| c.statistics().map(|stats| stats.null_count()));
-
-                    for (i, cnt) in columns_null_counts.enumerate() {
-                        null_counts[i] += cnt
-                    }
-
-                    for (i, column) in row_group_meta.columns().iter().enumerate() {
-                        if let Some(stat) = column.statistics() {
-                            match stat {
-                                ParquetStatistics::Boolean(s) => {
-                                    if let DataType::Boolean = fields[i].data_type() {
-                                        if s.has_min_max_set() {
-                                            if let Some(max_value) = &mut max_values[i] {
-                                                match max_value.update(&[
-                                                    ScalarValue::Boolean(Some(*s.max())),
-                                                ]) {
-                                                    Ok(_) => {}
-                                                    Err(_) => {
-                                                        max_values[i] = None;
-                                                    }
-                                                }
-                                            }
-                                            if let Some(min_value) = &mut min_values[i] {
-                                                match min_value.update(&[
-                                                    ScalarValue::Boolean(Some(*s.min())),
-                                                ]) {
-                                                    Ok(_) => {}
-                                                    Err(_) => {
-                                                        min_values[i] = None;
-                                                    }
-                                                }
-                                            }
-                                        }
-                                    }
-                                }
-                                ParquetStatistics::Int32(s) => {
-                                    if let DataType::Int32 = fields[i].data_type() {
-                                        if s.has_min_max_set() {
-                                            if let Some(max_value) = &mut max_values[i] {
-                                                match max_value.update(&[
-                                                    ScalarValue::Int32(Some(*s.max())),
-                                                ]) {
-                                                    Ok(_) => {}
-                                                    Err(_) => {
-                                                        max_values[i] = None;
-                                                    }
-                                                }
-                                            }
-                                            if let Some(min_value) = &mut min_values[i] {
-                                                match min_value.update(&[
-                                                    ScalarValue::Int32(Some(*s.min())),
-                                                ]) {
-                                                    Ok(_) => {}
-                                                    Err(_) => {
-                                                        min_values[i] = None;
-                                                    }
-                                                }
-                                            }
-                                        }
-                                    }
-                                }
-                                ParquetStatistics::Int64(s) => {
-                                    if let DataType::Int64 = fields[i].data_type() {
-                                        if s.has_min_max_set() {
-                                            if let Some(max_value) = &mut max_values[i] {
-                                                match max_value.update(&[
-                                                    ScalarValue::Int64(Some(*s.max())),
-                                                ]) {
-                                                    Ok(_) => {}
-                                                    Err(_) => {
-                                                        max_values[i] = None;
-                                                    }
-                                                }
-                                            }
-                                            if let Some(min_value) = &mut min_values[i] {
-                                                match min_value.update(&[
-                                                    ScalarValue::Int64(Some(*s.min())),
-                                                ]) {
-                                                    Ok(_) => {}
-                                                    Err(_) => {
-                                                        min_values[i] = None;
-                                                    }
-                                                }
-                                            }
-                                        }
-                                    }
-                                }
-                                ParquetStatistics::Float(s) => {
-                                    if let DataType::Float32 = fields[i].data_type() {
-                                        if s.has_min_max_set() {
-                                            if let Some(max_value) = &mut max_values[i] {
-                                                match max_value.update(&[
-                                                    ScalarValue::Float32(Some(*s.max())),
-                                                ]) {
-                                                    Ok(_) => {}
-                                                    Err(_) => {
-                                                        max_values[i] = None;
-                                                    }
-                                                }
-                                            }
-                                            if let Some(min_value) = &mut min_values[i] {
-                                                match min_value.update(&[
-                                                    ScalarValue::Float32(Some(*s.min())),
-                                                ]) {
-                                                    Ok(_) => {}
-                                                    Err(_) => {
-                                                        min_values[i] = None;
-                                                    }
-                                                }
-                                            }
-                                        }
-                                    }
-                                }
-                                ParquetStatistics::Double(s) => {
-                                    if let DataType::Float64 = fields[i].data_type() {
-                                        if s.has_min_max_set() {
-                                            if let Some(max_value) = &mut max_values[i] {
-                                                match max_value.update(&[
-                                                    ScalarValue::Float64(Some(*s.max())),
-                                                ]) {
-                                                    Ok(_) => {}
-                                                    Err(_) => {
-                                                        max_values[i] = None;
-                                                    }
-                                                }
-                                            }
-                                            if let Some(min_value) = &mut min_values[i] {
-                                                match min_value.update(&[
-                                                    ScalarValue::Float64(Some(*s.min())),
-                                                ]) {
-                                                    Ok(_) => {}
-                                                    Err(_) => {
-                                                        min_values[i] = None;
-                                                    }
-                                                }
-                                            }
-                                        }
-                                    }
-                                }
-                                _ => {}
-                            }
-                        }
-                    }
+        // collect statistics for all partitions - we should really do this in parallel threads
+        for chunk in chunks {
+            let filenames: Vec<String> = chunk.iter().map(|x| x.to_string()).collect();
+            partitions.push(ParquetPartition::try_from_files(filenames, limit)?);

Review comment:
       The plan is to use `tokio::spawn` here but this requires making more methods async so I wanted to tackle that as a separate PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org