You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/03 16:21:44 UTC

[GitHub] [arrow-datafusion] alamb opened a new pull request, #4099: minor: Encapsulate the parquet code a little more

alamb opened a new pull request, #4099:
URL: https://github.com/apache/arrow-datafusion/pull/4099

   # Which issue does this PR close?
   
   re https://github.com/apache/arrow-datafusion/issues/3462
   
   # Rationale for this change
   
   There is a lot more parquet reading code now after filter pushdown in https://github.com/apache/arrow-datafusion/pull/3780 and https://github.com/apache/arrow-datafusion/pull/3967
   
   Let's move it into smaller modules so it is easier to work with
   
   
   # What changes are included in this PR?
   
   1. move `physical_plan/file_format/row_filter.rs` to `physical_plan/file_format/parquet/row_filter.rs` to make it clear this is part of the parquet format implementation
   2. Extract the page level statistics pruning into `datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs`
   2. Extract metrics into `datafusion/core/src/physical_plan/file_format/parquet/metrics.rs`
   
   
   # Are there any user-facing changes?
   
   No
   
   There should be no functional change (none is intended)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] Ted-Jiang merged pull request #4099: minor: Split parquet reader up into smaller modules

Posted by GitBox <gi...@apache.org>.
Ted-Jiang merged PR #4099:
URL: https://github.com/apache/arrow-datafusion/pull/4099


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] Ted-Jiang commented on a diff in pull request #4099: minor: Split parquet reader up into smaller modules

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on code in PR #4099:
URL: https://github.com/apache/arrow-datafusion/pull/4099#discussion_r1013572379


##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -491,77 +436,33 @@ impl FileOpener for ParquetOpener {
                 };
             };
 
+            // Row group pruning: attempt to skip entire row_groups
+            // using metadata on the row groups
             let file_metadata = builder.metadata();
-            let groups = file_metadata.row_groups();
             let row_groups = prune_row_groups(
-                groups,
+                file_metadata.row_groups(),
                 file_range,
                 pruning_predicate.clone(),
                 &file_metrics,
             );
 
-            if enable_page_index {
-                let page_index_predicates = extract_page_index_push_down_predicates(
-                    &pruning_predicate,
-                    builder.schema().clone(),
-                )?;
-                if !page_index_predicates.is_empty() {
-                    let file_offset_indexes = file_metadata.offset_indexes();
-                    let file_page_indexes = file_metadata.page_indexes();
-                    if let (Some(file_offset_indexes), Some(file_page_indexes)) =
-                        (file_offset_indexes, file_page_indexes)
-                    {
-                        let mut row_selections =
-                            VecDeque::with_capacity(page_index_predicates.len());
-                        for predicate in page_index_predicates {
-                            // `extract_page_index_push_down_predicates` only return predicate with one col.
-                            let col_id = *predicate
-                                .need_input_columns_ids()
-                                .iter()
-                                .next()
-                                .unwrap();
-                            let mut selectors = Vec::with_capacity(row_groups.len());
-                            for r in &row_groups {
-                                let rg_offset_indexes = file_offset_indexes.get(*r);
-                                let rg_page_indexes = file_page_indexes.get(*r);
-                                if let (Some(rg_page_indexes), Some(rg_offset_indexes)) =
-                                    (rg_page_indexes, rg_offset_indexes)
-                                {
-                                    selectors.extend(
-                                        prune_pages_in_one_row_group(
-                                            &groups[*r],
-                                            &predicate,
-                                            rg_offset_indexes.get(col_id),
-                                            rg_page_indexes.get(col_id),
-                                            &file_metrics,
-                                        )
-                                            .map_err(|e| {
-                                                ArrowError::ParquetError(format!(
-                                                    "Fail in prune_pages_in_one_row_group: {}",
-                                                    e
-                                                ))
-                                            }),
-                                    );
-                                } else {
-                                    // fallback select all rows
-                                    let all_selected = vec![RowSelector::select(
-                                        groups[*r].num_rows() as usize,
-                                    )];
-                                    selectors.push(all_selected);
-                                }
-                            }
-                            debug!(
-                            "Use filter and page index create RowSelection {:?} from predicate:{:?}",
-                            &selectors, predicate
-                        );
-                            row_selections.push_back(
-                                selectors.into_iter().flatten().collect::<Vec<_>>(),
-                            );
-                        }
-                        let final_selection = combine_multi_col_selection(row_selections);
-                        builder = builder.with_row_selection(final_selection.into());
-                    }
-                }
+            // page index pruning: if all data on individual pages can

Review Comment:
   👍



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ursabot commented on pull request #4099: minor: Split parquet reader up into smaller modules

Posted by GitBox <gi...@apache.org>.
ursabot commented on PR #4099:
URL: https://github.com/apache/arrow-datafusion/pull/4099#issuecomment-1302917676

   Benchmark runs are scheduled for baseline = fc03001797444ae254fcc9e5196d8389fbc7fdcd and contender = f61b43ad704dc9e9229e1e0c14a4024c48e57c00. f61b43ad704dc9e9229e1e0c14a4024c48e57c00 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] [ec2-t3-xlarge-us-east-2](https://conbench.ursa.dev/compare/runs/051777f3d9e147c2afe77514ad8f4a89...7fae4fdedfa24271bb3708b5b153710c/)
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] [test-mac-arm](https://conbench.ursa.dev/compare/runs/dd0df07e8f1c45b1b8ac211f5d670605...7ca5121021e84e8e842e64f2080ccfaf/)
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] [ursa-i9-9960x](https://conbench.ursa.dev/compare/runs/d2d586aa799a41f2a7e5b3c269cc0450...75f93969ddb24f8cb442ab7f8f3d2214/)
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] [ursa-thinkcentre-m75q](https://conbench.ursa.dev/compare/runs/1dd79af3716d4e05891b85fde68ce906...ca41e9783b1647128cd817a602ed741e/)
   Buildkite builds:
   Supported benchmarks:
   ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
   test-mac-arm: Supported benchmark langs: C++, Python, R
   ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
   ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #4099: minor: Split parquet reader up into smaller modules

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #4099:
URL: https://github.com/apache/arrow-datafusion/pull/4099#issuecomment-1302591731

   This needs #4101  to pass Ci so marking as draft until then


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #4099: minor: Encapsulate the parquet code a little more

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #4099:
URL: https://github.com/apache/arrow-datafusion/pull/4099#discussion_r1013131122


##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -491,77 +436,33 @@ impl FileOpener for ParquetOpener {
                 };
             };
 
+            // Row group pruning: attempt to skip entire row_groups
+            // using metadata on the row groups
             let file_metadata = builder.metadata();
-            let groups = file_metadata.row_groups();
             let row_groups = prune_row_groups(
-                groups,
+                file_metadata.row_groups(),
                 file_range,
                 pruning_predicate.clone(),
                 &file_metrics,
             );
 
-            if enable_page_index {
-                let page_index_predicates = extract_page_index_push_down_predicates(
-                    &pruning_predicate,
-                    builder.schema().clone(),
-                )?;
-                if !page_index_predicates.is_empty() {
-                    let file_offset_indexes = file_metadata.offset_indexes();
-                    let file_page_indexes = file_metadata.page_indexes();
-                    if let (Some(file_offset_indexes), Some(file_page_indexes)) =
-                        (file_offset_indexes, file_page_indexes)
-                    {
-                        let mut row_selections =
-                            VecDeque::with_capacity(page_index_predicates.len());
-                        for predicate in page_index_predicates {
-                            // `extract_page_index_push_down_predicates` only return predicate with one col.
-                            let col_id = *predicate
-                                .need_input_columns_ids()
-                                .iter()
-                                .next()
-                                .unwrap();
-                            let mut selectors = Vec::with_capacity(row_groups.len());
-                            for r in &row_groups {
-                                let rg_offset_indexes = file_offset_indexes.get(*r);
-                                let rg_page_indexes = file_page_indexes.get(*r);
-                                if let (Some(rg_page_indexes), Some(rg_offset_indexes)) =
-                                    (rg_page_indexes, rg_offset_indexes)
-                                {
-                                    selectors.extend(
-                                        prune_pages_in_one_row_group(
-                                            &groups[*r],
-                                            &predicate,
-                                            rg_offset_indexes.get(col_id),
-                                            rg_page_indexes.get(col_id),
-                                            &file_metrics,
-                                        )
-                                            .map_err(|e| {
-                                                ArrowError::ParquetError(format!(
-                                                    "Fail in prune_pages_in_one_row_group: {}",
-                                                    e
-                                                ))
-                                            }),
-                                    );
-                                } else {
-                                    // fallback select all rows
-                                    let all_selected = vec![RowSelector::select(
-                                        groups[*r].num_rows() as usize,
-                                    )];
-                                    selectors.push(all_selected);
-                                }
-                            }
-                            debug!(
-                            "Use filter and page index create RowSelection {:?} from predicate:{:?}",
-                            &selectors, predicate
-                        );
-                            row_selections.push_back(
-                                selectors.into_iter().flatten().collect::<Vec<_>>(),
-                            );
-                        }
-                        let final_selection = combine_multi_col_selection(row_selections);
-                        builder = builder.with_row_selection(final_selection.into());
-                    }
-                }
+            // page index pruning: if all data on individual pages can

Review Comment:
   The core change of this PR is to refactor `build_page_filter` into a function and move its code into the `page_filter` module



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -2193,65 +1765,6 @@ mod tests {
         ParquetFileMetrics::new(0, "file.parquet", &metrics)
     }
 
-    #[test]

Review Comment:
   moved to `page_filter` module



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #4099: minor: Encapsulate the parquet code a little more

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #4099:
URL: https://github.com/apache/arrow-datafusion/pull/4099#discussion_r1013135630


##########
datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs:
##########
@@ -0,0 +1,514 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Contains code to filter entire pages
+
+use arrow::array::{BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array};
+use arrow::{array::ArrayRef, datatypes::SchemaRef, error::ArrowError};
+use datafusion_common::{Column, DataFusionError, Result};
+use datafusion_expr::utils::expr_to_columns;
+use datafusion_optimizer::utils::split_conjunction;
+use log::{debug, error};
+use parquet::{
+    arrow::arrow_reader::{RowSelection, RowSelector},
+    errors::ParquetError,
+    file::{
+        metadata::{ParquetMetaData, RowGroupMetaData},
+        page_index::index::Index,
+    },
+    format::PageLocation,
+};
+use std::collections::{HashSet, VecDeque};
+use std::sync::Arc;
+
+use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
+
+use super::metrics::ParquetFileMetrics;
+
+/// Create a RowSelection that may rule out ranges of rows based on

Review Comment:
   I polished up some of the existing doc comments to try and explain what this module is doing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] Ted-Jiang commented on pull request #4099: minor: Split parquet reader up into smaller modules

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on PR #4099:
URL: https://github.com/apache/arrow-datafusion/pull/4099#issuecomment-1302914177

   Looks more readable! Thanks @alamb 👍


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org