You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/05/25 16:34:17 UTC

[GitHub] [arrow-datafusion] Dandandan commented on a change in pull request #426: Rewrite pruning logic in terms of PruningStatistics using Array trait (option 2)

Dandandan commented on a change in pull request #426:
URL: https://github.com/apache/arrow-datafusion/pull/426#discussion_r638960571



##########
File path: datafusion/src/physical_plan/parquet.rs
##########
@@ -457,11 +461,102 @@ fn send_result(
     Ok(())
 }
 
+/// Wraps parquet statistics in a way
+/// that implements [`PruningStatistics`]
+struct RowGroupPruningStatistics<'a> {
+    row_group_metadata: &'a [RowGroupMetaData],
+    parquet_schema: &'a Schema,
+}
+
+/// Extract the min/max statistics from a `ParquetStatistics` object
+macro_rules! get_statistic {
+    ($column_statistics:expr, $func:ident, $bytes_func:ident) => {{
+        if !$column_statistics.has_min_max_set() {
+            return None;
+        }
+        match $column_statistics {
+            ParquetStatistics::Boolean(s) => Some(ScalarValue::Boolean(Some(*s.$func()))),
+            ParquetStatistics::Int32(s) => Some(ScalarValue::Int32(Some(*s.$func()))),
+            ParquetStatistics::Int64(s) => Some(ScalarValue::Int64(Some(*s.$func()))),
+            // 96 bit ints not supported
+            ParquetStatistics::Int96(_) => None,
+            ParquetStatistics::Float(s) => Some(ScalarValue::Float32(Some(*s.$func()))),
+            ParquetStatistics::Double(s) => Some(ScalarValue::Float64(Some(*s.$func()))),
+            ParquetStatistics::ByteArray(s) => {
+                let s = std::str::from_utf8(s.$bytes_func())
+                    .map(|s| s.to_string())
+                    .ok();
+                Some(ScalarValue::Utf8(s))
+            }
+            // type not supported yet
+            ParquetStatistics::FixedLenByteArray(_) => None,
+        }
+    }};
+}
+
+// Extract the min or max value calling `func` or `bytes_func` on the ParquetStatistics as appropriate
+macro_rules! get_min_max_values {
+    ($self:expr, $column:expr, $func:ident, $bytes_func:ident) => {{
+        let (column_index, field) = if let Some((v, f)) = $self.parquet_schema.column_with_name($column) {
+            (v, f)
+        } else {
+            // Named column was not present
+            return None
+        };
+
+        let data_type = field.data_type();
+        let null_scalar: ScalarValue = if let Ok(v) = data_type.try_into() {
+            v
+        } else {
+            // DataFusion doesn't have support for ScalarValues of the column type
+            return None
+        };
+
+        let scalar_values : Vec<ScalarValue> = $self.row_group_metadata
+            .iter()
+            .flat_map(|meta| {
+                meta.column(column_index).statistics()
+            })
+            .map(|stats| {
+                get_statistic!(stats, $func, $bytes_func)
+            })
+            .map(|maybe_scalar| {
+                // column either did't have statistics at all or didn't have min/max values
+                maybe_scalar.unwrap_or_else(|| null_scalar.clone())
+            })
+            .collect();

Review comment:
       Collecting to `Vec` might not be necessary here, we could maybe provide it to `ScalarValue::iter_to_array` directly? 

##########
File path: datafusion/src/physical_optimizer/pruning.rs
##########
@@ -141,39 +182,78 @@ impl PruningPredicateBuilder {
             .map(|x| x.unwrap_or(true))
             .collect::<Vec<_>>())
     }
+
+    /// Return a reference to the input schema
+    pub fn schema(&self) -> &SchemaRef {
+        &self.schema
+    }
 }
 
-/// Build a RecordBatch from a list of statistics (currently parquet
-/// [`RowGroupMetadata`] structs), creating arrays, one for each
-/// statistics column, as requested in the stat_column_req parameter.
-fn build_statistics_record_batch(
-    statistics: &[RowGroupMetaData],
-    schema: &Schema,
+/// Build a RecordBatch from a list of statistics, creating arrays,
+/// with one row for each PruningStatistics and columns specified in
+/// in the stat_column_req parameter.
+///
+/// For example, if the requested columns are
+/// ```text
+/// ("s1", Min, Field:s1_min)
+/// ("s2", Max, field:s2_max)
+///```
+///
+/// And the input statistics had
+/// ```text
+/// S1(Min: 5, Max: 10)
+/// S2(Min: 99, Max: 1000)
+/// S3(Min: 1, Max: 2)
+/// ```
+///
+/// Then this function would build a record batch with 2 columns and
+/// one row s1_min and s2_max as follows (s3 is not requested):
+///
+/// ```text
+/// s1_min | s2_max
+/// -------+--------
+///   5    | 1000
+/// ```
+fn build_statistics_record_batch<S: PruningStatistics>(
+    statistics: &S,
     stat_column_req: &[(String, StatisticsType, Field)],
 ) -> Result<RecordBatch> {
     let mut fields = Vec::<Field>::new();
     let mut arrays = Vec::<ArrayRef>::new();
+    // For each needed statistics column:
     for (column_name, statistics_type, stat_field) in stat_column_req {
-        if let Some((column_index, _)) = schema.column_with_name(column_name) {
-            let statistics = statistics
-                .iter()
-                .map(|g| g.column(column_index).statistics())
-                .collect::<Vec<_>>();
-            let array = build_statistics_array(
-                &statistics,
-                *statistics_type,
-                stat_field.data_type(),
-            );
-            fields.push(stat_field.clone());
-            arrays.push(array);
+        let data_type = stat_field.data_type();
+
+        let num_containers = statistics.num_containers();
+
+        let array = match statistics_type {

Review comment:
       So here is the core difference for changing it to array - less work here is needed when the statistics are loaded.
   Trade off there makes sense for me, at least in cases when we can keep the statistics this should be beneficial.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org