You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/10/13 13:33:39 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #3780: Implement parquet page-level skipping with column index, using min/ma…

alamb commented on code in PR #3780:
URL: https://github.com/apache/arrow-datafusion/pull/3780#discussion_r994592864


##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -749,6 +811,61 @@ macro_rules! get_null_count_values {
     }};
 }
 
+// Extract the min or max value calling `func` from page idex
+macro_rules! get_min_max_values_form_page_index {

Review Comment:
   Small typo
   
   ```suggestion
   macro_rules! get_min_max_values_for_page_index {
   ```



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -828,6 +996,97 @@ fn prune_row_groups(
     filtered
 }
 
+fn prune_pages_in_one_row_group(
+    group: &RowGroupMetaData,
+    predicate: Option<PruningPredicate>,
+    offset_indexes: Option<&Vec<Vec<PageLocation>>>,
+    page_indexes: Option<&Vec<Index>>,
+    metrics: &ParquetFileMetrics,
+) -> Result<Vec<RowSelector>> {
+    let num_rows = group.num_rows() as usize;
+    if let (Some(predicate), Some(offset_indexes), Some(page_indexes)) =
+        (&predicate, offset_indexes, page_indexes)
+    {
+        let pruning_stats = PagesPruningStatistics {
+            page_indexes,
+            offset_indexes,
+            parquet_schema: predicate.schema().as_ref(),
+            // now we assume only support one col.

Review Comment:
   See comments elsewhere on multi-column predicates



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -2154,4 +2413,68 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn parquet_exec_with_page_index_filter() -> Result<()> {
+        let session_ctx = SessionContext::new();
+        let task_ctx = session_ctx.task_ctx();
+
+        let object_store_url = ObjectStoreUrl::local_filesystem();
+        let store = session_ctx
+            .runtime_env()
+            .object_store(&object_store_url)
+            .unwrap();
+
+        let testdata = crate::test_util::parquet_test_data();
+        let filename = format!("{}/alltypes_tiny_pages.parquet", testdata);
+
+        let meta = local_unpartitioned_file(filename);
+
+        let schema = ParquetFormat::default()
+            .infer_schema(&store, &[meta.clone()])
+            .await
+            .unwrap();
+
+        let partitioned_file = PartitionedFile {
+            object_meta: meta,
+            partition_values: vec![],
+            range: None,
+            extensions: None,
+        };
+
+        // create filter month == 1;
+        let filter = col("month").eq(lit(1_i32));
+        let parquet_exec = ParquetExec::new(
+            FileScanConfig {
+                object_store_url,
+                file_groups: vec![vec![partitioned_file]],
+                file_schema: schema,
+                statistics: Statistics::default(),
+                // file has 10 cols so index 12 should be month
+                projection: None,
+                limit: None,
+                table_partition_cols: vec![],
+            },
+            Some(filter),
+            None,
+        );
+
+        let parquet_exec_page_index = parquet_exec
+            .clone()
+            .with_scan_options(ParquetScanOptions::default().with_page_index(true));
+
+        let mut results = parquet_exec_page_index.execute(0, task_ctx)?;

Review Comment:
   πŸ‘ 



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -460,6 +498,20 @@ impl FileOpener for ParquetOpener {
     }
 }
 
+// Check PruningPredicates just work on one column.
+fn check_page_index_push_down_valid(predicate: &Option<PruningPredicate>) -> bool {
+    if let Some(predicate) = predicate {
+        // for now we only support pushDown on one col, because each col may have different page numbers, its hard to get
+        // `num_containers` from `PruningStatistics`.
+        let cols = predicate.need_input_columns_ids();
+        //Todo more specific rules

Review Comment:
   So among other things, this implies we probably want to break the pruning predicate down into conjuncts (aka split on `AND`) to try and isolate predicates that refer only to a single column



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -785,6 +902,57 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
     }
 }
 
+impl<'a> PruningStatistics for PagesPruningStatistics<'a> {
+    fn min_values(&self, column: &Column) -> Option<ArrayRef> {
+        get_min_max_values_form_page_index!(self, column, min)
+    }
+
+    fn max_values(&self, column: &Column) -> Option<ArrayRef> {
+        get_min_max_values_form_page_index!(self, column, max)
+    }
+
+    fn num_containers(&self) -> usize {
+        self.offset_indexes.get(self.col_id).unwrap().len()
+    }
+
+    fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
+        if let Some((col_id_index, _field)) =
+            self.parquet_schema.column_with_name(&column.name)
+        {
+            if let Some(page_index) = self.page_indexes.get(col_id_index) {
+                match page_index {
+                    Index::NONE => None,
+                    Index::BOOLEAN(index) => Some(Arc::new(Int64Array::from_iter(
+                        index.indexes.iter().map(|x| x.null_count),
+                    ))),
+                    Index::INT32(index) => Some(Arc::new(Int64Array::from_iter(
+                        index.indexes.iter().map(|x| x.null_count),
+                    ))),
+                    Index::INT64(index) => Some(Arc::new(Int64Array::from_iter(
+                        index.indexes.iter().map(|x| x.null_count),
+                    ))),
+                    Index::FLOAT(index) => Some(Arc::new(Int64Array::from_iter(
+                        index.indexes.iter().map(|x| x.null_count),
+                    ))),
+                    Index::DOUBLE(index) => Some(Arc::new(Int64Array::from_iter(
+                        index.indexes.iter().map(|x| x.null_count),
+                    ))),
+                    Index::INT96(_)
+                    | Index::BYTE_ARRAY(_)
+                    | Index::FIXED_LEN_BYTE_ARRAY(_) => {
+                        // Todo support these types
+                        None
+                    }
+                }
+            } else {
+                None
+            }
+        } else {
+            None

Review Comment:
   Yes I think so



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -785,6 +902,57 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
     }
 }
 
+impl<'a> PruningStatistics for PagesPruningStatistics<'a> {
+    fn min_values(&self, column: &Column) -> Option<ArrayRef> {
+        get_min_max_values_form_page_index!(self, column, min)
+    }
+
+    fn max_values(&self, column: &Column) -> Option<ArrayRef> {
+        get_min_max_values_form_page_index!(self, column, max)
+    }
+
+    fn num_containers(&self) -> usize {
+        self.offset_indexes.get(self.col_id).unwrap().len()

Review Comment:
   I think the idea of `containers` was that it was the unit of thing that has statistics
   
   So since each RowGroup (may) have statistics, when pruning entire RowGroups the num_containers is the same as the number of RowGroups.
   
   When pruning pages, I would expect the number of containers to be the same as the number of pages that have statistics.



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -828,6 +996,97 @@ fn prune_row_groups(
     filtered
 }
 
+fn prune_pages_in_one_row_group(
+    group: &RowGroupMetaData,
+    predicate: Option<PruningPredicate>,
+    offset_indexes: Option<&Vec<Vec<PageLocation>>>,
+    page_indexes: Option<&Vec<Index>>,
+    metrics: &ParquetFileMetrics,
+) -> Result<Vec<RowSelector>> {

Review Comment:
   I wonder what you think about using `RowSelection` https://docs.rs/parquet/24.0.0/parquet/arrow/arrow_reader/struct.RowSelection.html rather than `Vec<RowSelector>`?
   
   It might be easier to manipulate / update (especially when this is combined with predicate evaluation)



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -460,6 +498,20 @@ impl FileOpener for ParquetOpener {
     }
 }
 
+// Check PruningPredicates just work on one column.
+fn check_page_index_push_down_valid(predicate: &Option<PruningPredicate>) -> bool {
+    if let Some(predicate) = predicate {
+        // for now we only support pushDown on one col, because each col may have different page numbers, its hard to get
+        // `num_containers` from `PruningStatistics`.
+        let cols = predicate.need_input_columns_ids();
+        //Todo more specific rules

Review Comment:
   It also means that the "number of containers" in Column A will be "2" (as it as pages 1 and 2) and the number of containers for Column B will be "3" (as it has pages 3, 4, 5).



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -785,6 +902,57 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
     }
 }
 
+impl<'a> PruningStatistics for PagesPruningStatistics<'a> {
+    fn min_values(&self, column: &Column) -> Option<ArrayRef> {
+        get_min_max_values_form_page_index!(self, column, min)
+    }
+
+    fn max_values(&self, column: &Column) -> Option<ArrayRef> {
+        get_min_max_values_form_page_index!(self, column, max)
+    }
+
+    fn num_containers(&self) -> usize {
+        self.offset_indexes.get(self.col_id).unwrap().len()

Review Comment:
   I tried to explain this more clearly in the diagram



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -749,6 +811,61 @@ macro_rules! get_null_count_values {
     }};
 }
 
+// Extract the min or max value calling `func` from page idex
+macro_rules! get_min_max_values_form_page_index {
+    ($self:expr, $column:expr, $func:ident) => {{
+        if let Some((col_id_index, _field)) =
+            $self.parquet_schema.column_with_name(&$column.name)
+        {
+            if let Some(page_index) = $self.page_indexes.get(col_id_index) {
+                match page_index {
+                    Index::NONE => None,
+                    Index::INT32(index) => {
+                        let vec = &index.indexes;
+                        Some(Arc::new(Int32Array::from_iter(
+                            vec.iter().map(|x| x.$func().cloned()),
+                        )))
+                    }
+                    Index::INT64(index) => {
+                        let vec = &index.indexes;
+                        Some(Arc::new(Int64Array::from_iter(
+                            vec.iter().map(|x| x.$func().cloned()),
+                        )))
+                    }
+                    Index::FLOAT(index) => {
+                        let vec = &index.indexes;
+                        Some(Arc::new(Float32Array::from_iter(
+                            vec.iter().map(|x| x.$func().cloned()),
+                        )))
+                    }
+                    Index::DOUBLE(index) => {
+                        let vec = &index.indexes;
+                        Some(Arc::new(Float64Array::from_iter(
+                            vec.iter().map(|x| x.$func().cloned()),
+                        )))
+                    }
+                    Index::BOOLEAN(index) => {
+                        let vec = &index.indexes;
+                        Some(Arc::new(BooleanArray::from_iter(
+                            vec.iter().map(|x| x.$func().cloned()),
+                        )))
+                    }
+                    Index::INT96(_)
+                    | Index::BYTE_ARRAY(_)
+                    | Index::FIXED_LEN_BYTE_ARRAY(_) => {
+                        //Todo support these type

Review Comment:
   it is probably worth a ticket to track this work



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -460,6 +498,20 @@ impl FileOpener for ParquetOpener {
     }
 }
 
+// Check PruningPredicates just work on one column.
+fn check_page_index_push_down_valid(predicate: &Option<PruningPredicate>) -> bool {
+    if let Some(predicate) = predicate {
+        // for now we only support pushDown on one col, because each col may have different page numbers, its hard to get
+        // `num_containers` from `PruningStatistics`.
+        let cols = predicate.need_input_columns_ids();
+        //Todo more specific rules

Review Comment:
   I don't think there is a guarantee that the data pages for each column break at the same row index -- since the statistics are stored per page (and thus for a single column) I am not sure when/if we'll be able to support multi column predicates (aka predicates that refer to different columns).
   
   I tried to draw a diagram to illustrate what I think is going on:
   
   ```
   ┏━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ 
      β”Œ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   β”Œ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ┃
   ┃     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚  ┃
   ┃  β”‚  β”‚              β”‚     β”‚  β”‚              β”‚     ┃
   ┃     β”‚              β”‚  β”‚     β”‚     Page     β”‚  β”‚   
      β”‚  β”‚              β”‚     β”‚  β”‚      3       β”‚     ┃
   ┃     β”‚              β”‚  β”‚     β”‚   min: "A"   β”‚  β”‚  ┃
   ┃  β”‚  β”‚              β”‚     β”‚  β”‚   max: "C"   β”‚     ┃
   ┃     β”‚     Page     β”‚  β”‚     β”‚ first_row: 0 β”‚  β”‚   
      β”‚  β”‚      1       β”‚     β”‚  β”‚              β”‚     ┃
   ┃     β”‚   min: 10    β”‚  β”‚     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚  ┃
   ┃  β”‚  β”‚   max: 20    β”‚     β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     ┃
   ┃     β”‚ first_row: 0 β”‚  β”‚     β”‚              β”‚  β”‚   
      β”‚  β”‚              β”‚     β”‚  β”‚     Page     β”‚     ┃
   ┃     β”‚              β”‚  β”‚     β”‚      4       β”‚  β”‚  ┃
   ┃  β”‚  β”‚              β”‚     β”‚  β”‚   min: "D"   β”‚     ┃
   ┃     β”‚              β”‚  β”‚     β”‚   max: "G"   β”‚  β”‚   
      β”‚  β”‚              β”‚     β”‚  β”‚first_row: 100β”‚     ┃
   ┃     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚     β”‚              β”‚  β”‚  ┃
   ┃  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”‚  β”‚              β”‚     ┃
   ┃     β”‚              β”‚  β”‚     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚   
      β”‚  β”‚     Page     β”‚     β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     ┃
   ┃     β”‚      2       β”‚  β”‚     β”‚              β”‚  β”‚  ┃
   ┃  β”‚  β”‚   min: 30    β”‚     β”‚  β”‚     Page     β”‚     ┃
   ┃     β”‚   max: 40    β”‚  β”‚     β”‚      5       β”‚  β”‚   
      β”‚  β”‚first_row: 200β”‚     β”‚  β”‚   min: "H"   β”‚     ┃
   ┃     β”‚              β”‚  β”‚     β”‚   max: "Z"   β”‚  β”‚  ┃
   ┃  β”‚  β”‚              β”‚     β”‚  β”‚first_row: 250β”‚     ┃
   ┃     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚     β”‚              β”‚  β”‚   
      β”‚                       β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     ┃
   ┃   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ β”˜   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ β”˜  ┃
   ┃       ColumnChunk            ColumnChunk         ┃
   ┃            A                      B               
    ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━┛
                                                       
                                                       
      Parquet RowGroup with 2 columns, A and B.        
                                                       
    Column A is stored in 2 pages and Column B is      
                  stored in 3 pages                    
                                                       
                                                       
                                                      
   ```
   
                                                             
   Given the predicate `A > 35 `, we could rule out page 1 using statistics  (max is 20)                                               
                                                             
   This means we can avoid decoding row_index 0->199 for      ColumnChunk B (and skip page 3 entirely)                   
                                                             
   
   ## Multi Column Predicates                                                           
   Given the predicate `A > 35 AND B = "F"`:                    
                                             
   Using `A > 35`, we can rule out page 1 using statistics as before                                                     
                                                              
   Using the `B = "F"` part, we could rule out pages 3 and 5 (row_index 0->99 and row_index 250->onward)                
                                                              
   Thus only rows in indexes 100->244 need to be decoded  (Only those ranges are needed in the`RowSelector`)
                                                              
                                                              
   



##########
datafusion/core/src/physical_optimizer/pruning.rs:
##########
@@ -223,6 +223,19 @@ impl PruningPredicate {
     pub fn predicate_expr(&self) -> &Arc<dyn PhysicalExpr> {
         &self.predicate_expr
     }
+
+    pub(crate) fn need_input_columns_ids(&self) -> HashSet<usize> {

Review Comment:
   Can we add some comments here explaining what ids these are? Specifically, I think they are the column indexes in the combined schema required to evaluate the pruning predicate, right?



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -749,6 +811,61 @@ macro_rules! get_null_count_values {
     }};
 }
 
+// Extract the min or max value calling `func` from page idex
+macro_rules! get_min_max_values_form_page_index {
+    ($self:expr, $column:expr, $func:ident) => {{
+        if let Some((col_id_index, _field)) =
+            $self.parquet_schema.column_with_name(&$column.name)
+        {
+            if let Some(page_index) = $self.page_indexes.get(col_id_index) {

Review Comment:
   It is so sad that the Page Index doesn't use the same Statistics structure as RowGroupMetadata, but I see this is how it is done in the parquet thrift specification: https://github.com/apache/parquet-format/blame/master/src/main/thrift/parquet.thrift#L938-L971  😒 
   
   
   
   All the statistics in parquet are stored in 
   https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift#L198-L226
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org