You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/08/17 21:28:30 UTC

[GitHub] [arrow-datafusion] alamb commented on a change in pull request #897: Refactor ParquetExec in preparation for implementing parallel scans for statistics

alamb commented on a change in pull request #897:
URL: https://github.com/apache/arrow-datafusion/pull/897#discussion_r690734383



##########
File path: datafusion/src/physical_plan/parquet.rs
##########
@@ -582,14 +389,244 @@ impl ParquetExec {
 
 impl ParquetPartition {
     /// Create a new parquet partition
-    pub fn new(filenames: Vec<String>, statistics: Statistics) -> Self {
+    pub fn new(
+        filenames: Vec<String>,
+        schemas: Vec<SchemaRef>,
+        statistics: Statistics,
+    ) -> Self {
         Self {
             filenames,
+            schemas,
             statistics,
             metrics: ParquetPartitionMetrics::new(),
         }
     }
 
+    /// Create a new parquet partition by scanning a set of files
+    pub fn try_from_files(filenames: Vec<String>, limit: Option<usize>) -> Result<Self> {
+        let mut num_rows = 0;
+        let mut num_fields = 0;
+        let mut fields = Vec::new();
+        let mut total_byte_size = 0;
+        let mut null_counts = Vec::new();
+        let mut max_values: Vec<Option<MaxAccumulator>> = Vec::new();
+        let mut min_values: Vec<Option<MinAccumulator>> = Vec::new();
+        let mut total_files = 0;
+        let mut schemas: Vec<SchemaRef> = vec![];
+        for filename in &filenames {
+            total_files += 1;
+            let file = File::open(filename)?;
+            let file_reader = Arc::new(SerializedFileReader::new(file)?);
+            let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
+            let meta_data = arrow_reader.get_metadata();
+            // collect all the unique schemas in this data set
+            let schema = arrow_reader.get_schema()?;
+            if schemas.is_empty() || &schema != schemas[0].as_ref() {
+                fields = schema.fields().to_vec();
+                num_fields = schema.fields().len();
+                null_counts = vec![0; num_fields];
+                max_values = schema
+                    .fields()
+                    .iter()
+                    .map(|field| MaxAccumulator::try_new(field.data_type()).ok())
+                    .collect::<Vec<_>>();
+                min_values = schema
+                    .fields()
+                    .iter()
+                    .map(|field| MinAccumulator::try_new(field.data_type()).ok())
+                    .collect::<Vec<_>>();
+                schemas.push(Arc::new(schema));
+            }
+
+            for row_group_meta in meta_data.row_groups() {
+                num_rows += row_group_meta.num_rows();
+                total_byte_size += row_group_meta.total_byte_size();
+
+                // Currently assumes every Parquet file has same schema
+                // https://issues.apache.org/jira/browse/ARROW-11017
+                let columns_null_counts = row_group_meta
+                    .columns()
+                    .iter()
+                    .flat_map(|c| c.statistics().map(|stats| stats.null_count()));
+
+                for (i, cnt) in columns_null_counts.enumerate() {
+                    null_counts[i] += cnt
+                }
+
+                for (i, column) in row_group_meta.columns().iter().enumerate() {
+                    if let Some(stat) = column.statistics() {
+                        match stat {
+                            ParquetStatistics::Boolean(s) => {
+                                if let DataType::Boolean = fields[i].data_type() {
+                                    if s.has_min_max_set() {
+                                        if let Some(max_value) = &mut max_values[i] {
+                                            match max_value.update(&[
+                                                ScalarValue::Boolean(Some(*s.max())),
+                                            ]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    max_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                        if let Some(min_value) = &mut min_values[i] {
+                                            match min_value.update(&[
+                                                ScalarValue::Boolean(Some(*s.min())),
+                                            ]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    min_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                            ParquetStatistics::Int32(s) => {
+                                if let DataType::Int32 = fields[i].data_type() {
+                                    if s.has_min_max_set() {
+                                        if let Some(max_value) = &mut max_values[i] {
+                                            match max_value.update(&[ScalarValue::Int32(
+                                                Some(*s.max()),
+                                            )]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    max_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                        if let Some(min_value) = &mut min_values[i] {
+                                            match min_value.update(&[ScalarValue::Int32(
+                                                Some(*s.min()),
+                                            )]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    min_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                            ParquetStatistics::Int64(s) => {
+                                if let DataType::Int64 = fields[i].data_type() {
+                                    if s.has_min_max_set() {
+                                        if let Some(max_value) = &mut max_values[i] {
+                                            match max_value.update(&[ScalarValue::Int64(
+                                                Some(*s.max()),
+                                            )]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    max_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                        if let Some(min_value) = &mut min_values[i] {
+                                            match min_value.update(&[ScalarValue::Int64(
+                                                Some(*s.min()),
+                                            )]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    min_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                            ParquetStatistics::Float(s) => {
+                                if let DataType::Float32 = fields[i].data_type() {
+                                    if s.has_min_max_set() {
+                                        if let Some(max_value) = &mut max_values[i] {
+                                            match max_value.update(&[
+                                                ScalarValue::Float32(Some(*s.max())),
+                                            ]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    max_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                        if let Some(min_value) = &mut min_values[i] {
+                                            match min_value.update(&[
+                                                ScalarValue::Float32(Some(*s.min())),
+                                            ]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    min_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                            ParquetStatistics::Double(s) => {
+                                if let DataType::Float64 = fields[i].data_type() {
+                                    if s.has_min_max_set() {
+                                        if let Some(max_value) = &mut max_values[i] {
+                                            match max_value.update(&[
+                                                ScalarValue::Float64(Some(*s.max())),
+                                            ]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    max_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                        if let Some(min_value) = &mut min_values[i] {
+                                            match min_value.update(&[
+                                                ScalarValue::Float64(Some(*s.min())),
+                                            ]) {
+                                                Ok(_) => {}
+                                                Err(_) => {
+                                                    min_values[i] = None;
+                                                }
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                            _ => {}
+                        }
+                    }
+                }
+
+                if limit.map(|x| num_rows >= x as i64).unwrap_or(false) {
+                    break;
+                }
+            }
+        }
+        let column_stats = (0..num_fields)
+            .map(|i| {
+                let max_value = match &max_values[i] {
+                    Some(max_value) => max_value.evaluate().ok(),
+                    None => None,
+                };
+                let min_value = match &min_values[i] {
+                    Some(min_value) => min_value.evaluate().ok(),
+                    None => None,
+                };
+                ColumnStatistics {
+                    null_count: Some(null_counts[i] as usize),
+                    max_value,
+                    min_value,
+                    distinct_count: None,
+                }
+            })
+            .collect();
+
+        let statistics = Statistics {
+            num_rows: Some(num_rows as usize),
+            total_byte_size: Some(total_byte_size as usize),
+            column_statistics: Some(column_stats),
+        };
+        // remove files that are not needed in case of limit
+        let mut filenames = filenames;
+        filenames.truncate(total_files);

Review comment:
       do you also need to truncate `schemas` as well?

##########
File path: datafusion/src/physical_plan/parquet.rs
##########
@@ -582,14 +389,244 @@ impl ParquetExec {
 
 impl ParquetPartition {
     /// Create a new parquet partition
-    pub fn new(filenames: Vec<String>, statistics: Statistics) -> Self {
+    pub fn new(
+        filenames: Vec<String>,

Review comment:
       I wonder if having a `Vec<FilenameAndSchema>` would be clearer than two parallel arrays (and then they could not get out of sync either)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org