You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/01 17:03:21 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #3967: Support pushdown multi-columns in PageIndex pruning.

alamb commented on code in PR #3967:
URL: https://github.com/apache/arrow-datafusion/pull/3967#discussion_r1010674223


##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -1021,72 +1120,57 @@ fn prune_row_groups(
 
 fn prune_pages_in_one_row_group(
     group: &RowGroupMetaData,
-    predicate: Option<PruningPredicate>,
-    offset_indexes: Option<&Vec<Vec<PageLocation>>>,
-    page_indexes: Option<&Vec<Index>>,
+    predicate: &PruningPredicate,
+    col_offset_indexes: Option<&Vec<PageLocation>>,
+    col_page_indexes: Option<&Index>,
     metrics: &ParquetFileMetrics,
 ) -> Result<Vec<RowSelector>> {
     let num_rows = group.num_rows() as usize;
-    if let (Some(predicate), Some(offset_indexes), Some(page_indexes)) =
-        (&predicate, offset_indexes, page_indexes)
+    if let (Some(col_offset_indexes), Some(col_page_indexes)) =
+        (col_offset_indexes, col_page_indexes)
     {
         let pruning_stats = PagesPruningStatistics {
-            page_indexes,
-            offset_indexes,
-            parquet_schema: predicate.schema().as_ref(),
-            // now we assume only support one col.
-            col_id: *predicate
-                .need_input_columns_ids()
-                .iter()
-                .take(1)
-                .next()
-                .unwrap(),
+            col_page_indexes,
+            col_offset_indexes,
         };
 
         match predicate.prune(&pruning_stats) {
             Ok(values) => {
                 let mut vec = Vec::with_capacity(values.len());
-                if let Some(cols_offset_indexes) =
-                    offset_indexes.get(pruning_stats.col_id)
-                {
-                    let row_vec =
-                        create_row_count_in_each_page(cols_offset_indexes, num_rows);
-                    assert_eq!(row_vec.len(), values.len());
-                    let mut sum_row = *row_vec.first().unwrap();
-                    let mut selected = *values.first().unwrap();
-
-                    for (i, &f) in values.iter().skip(1).enumerate() {
-                        if f == selected {
-                            sum_row += *row_vec.get(i).unwrap();
+                let row_vec = create_row_count_in_each_page(col_offset_indexes, num_rows);
+                assert_eq!(row_vec.len(), values.len());
+                let mut sum_row = *row_vec.first().unwrap();
+                let mut selected = *values.first().unwrap();
+
+                for (i, &f) in values.iter().skip(1).enumerate() {
+                    if f == selected {
+                        sum_row += *row_vec.get(i).unwrap();
+                    } else {
+                        let selector = if selected {
+                            RowSelector::select(sum_row)
                         } else {
-                            let selector = if selected {
-                                RowSelector::select(sum_row)
-                            } else {
-                                RowSelector::skip(sum_row)
-                            };
-                            vec.push(selector);
-                            sum_row = *row_vec.get(i).unwrap();
-                            selected = f;
-                        }
+                            RowSelector::skip(sum_row)
+                        };
+                        vec.push(selector);
+                        sum_row = *row_vec.get(i).unwrap();
+                        selected = f;
                     }
+                }
 
-                    let selector = if selected {
-                        RowSelector::select(sum_row)
-                    } else {
-                        RowSelector::skip(sum_row)
-                    };
-                    vec.push(selector);
-                    return Ok(vec);
+                let selector = if selected {
+                    RowSelector::select(sum_row)
                 } else {
-                    debug!("Error evaluating page index predicate values missing page index col_id is{}", pruning_stats.col_id);
-                    metrics.predicate_evaluation_errors.add(1);
-                }
+                    RowSelector::skip(sum_row)
+                };
+                vec.push(selector);
+                return Ok(vec);
             }
             // stats filter array could not be built
-            // return a closure which will not filter out any row groups
+            // return a result which will not filter out any pages
             Err(e) => {
-                debug!("Error evaluating page index predicate values {}", e);
+                error!("Error evaluating page index predicate values {}", e);
                 metrics.predicate_evaluation_errors.add(1);
+                return Ok(vec![RowSelector::select(group.num_rows() as usize)]);

Review Comment:
   I verified that this PR fixes #4002 -- I have also prepared a PR to enable page filtering in the integration test https://github.com/apache/arrow-datafusion/pull/4062 which we can run when this PR is merged



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org