You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2024/01/24 18:37:25 UTC

(arrow-datafusion) branch main updated: Find the correct fields when using page filter on `struct` fields in parquet (#8848)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 7ad929a117 Find the correct fields when using page filter on `struct` fields in parquet (#8848)
7ad929a117 is described below

commit 7ad929a117342b425631138693826b57c5346ac6
Author: manoj-inukolunu <54...@users.noreply.github.com>
AuthorDate: Thu Jan 25 00:07:17 2024 +0530

    Find the correct fields when using page filter on `struct` fields in parquet (#8848)
    
    * Dont consider struct fields for filtering in parquet
    
    * use parquet_column instead of find_column_index.
    
    * Remove unused struct
    
    * Fix formatting issues.
    
    * Simplify struct field resolution
    
    * fix formatting
    
    * fmt
    
    ---------
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
 .../src/datasource/physical_plan/parquet/mod.rs    | 80 ++++++++++++++++++++--
 .../physical_plan/parquet/page_filter.rs           | 39 ++++-------
 2 files changed, 90 insertions(+), 29 deletions(-)

diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index c2689cfb10..7215cdd607 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -549,8 +549,13 @@ impl FileOpener for ParquetOpener {
             // with that range can be skipped as well
             if enable_page_index && !row_groups.is_empty() {
                 if let Some(p) = page_pruning_predicate {
-                    let pruned =
-                        p.prune(&row_groups, file_metadata.as_ref(), &file_metrics)?;
+                    let pruned = p.prune(
+                        &file_schema,
+                        builder.parquet_schema(),
+                        &row_groups,
+                        file_metadata.as_ref(),
+                        &file_metrics,
+                    )?;
                     if let Some(row_selection) = pruned {
                         builder = builder.with_row_selection(row_selection);
                     }
@@ -782,7 +787,8 @@ mod tests {
         array::{Int64Array, Int8Array, StringArray},
         datatypes::{DataType, Field, SchemaBuilder},
     };
-    use arrow_array::Date64Array;
+    use arrow_array::{Date64Array, StructArray};
+    use arrow_schema::Fields;
     use chrono::{TimeZone, Utc};
     use datafusion_common::{assert_contains, ToDFSchema};
     use datafusion_common::{FileType, GetExt, ScalarValue};
@@ -793,6 +799,7 @@ mod tests {
     use object_store::local::LocalFileSystem;
     use object_store::path::Path;
     use object_store::ObjectMeta;
+    use parquet::arrow::ArrowWriter;
     use std::fs::{self, File};
     use std::io::Write;
     use tempfile::TempDir;
@@ -1765,12 +1772,14 @@ mod tests {
 
         // assert the batches and some metrics
         #[rustfmt::skip]
-        let expected = ["+-----+",
+        let expected = [
+            "+-----+",
             "| int |",
             "+-----+",
             "| 4   |",
             "| 5   |",
-            "+-----+"];
+            "+-----+"
+        ];
         assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
         assert_eq!(get_value(&metrics, "page_index_rows_filtered"), 4);
         assert!(
@@ -2136,4 +2145,65 @@ mod tests {
         let execution_props = ExecutionProps::new();
         create_physical_expr(expr, &df_schema, &execution_props).unwrap()
     }
+
+    #[tokio::test]
+    async fn test_struct_filter_parquet() -> Result<()> {
+        let tmp_dir = TempDir::new()?;
+        let path = tmp_dir.path().to_str().unwrap().to_string() + "/test.parquet";
+        write_file(&path);
+        let ctx = SessionContext::new();
+        let opt = ListingOptions::new(Arc::new(ParquetFormat::default()));
+        ctx.register_listing_table("base_table", path, opt, None, None)
+            .await
+            .unwrap();
+        let sql = "select * from base_table where name='test02'";
+        let batch = ctx.sql(sql).await.unwrap().collect().await.unwrap();
+        assert_eq!(batch.len(), 1);
+        let expected = [
+            "+---------------------+----+--------+",
+            "| struct              | id | name   |",
+            "+---------------------+----+--------+",
+            "| {id: 4, name: aaa2} | 2  | test02 |",
+            "+---------------------+----+--------+",
+        ];
+        crate::assert_batches_eq!(expected, &batch);
+        Ok(())
+    }
+
+    fn write_file(file: &String) {
+        let struct_fields = Fields::from(vec![
+            Field::new("id", DataType::Int64, false),
+            Field::new("name", DataType::Utf8, false),
+        ]);
+        let schema = Schema::new(vec![
+            Field::new("struct", DataType::Struct(struct_fields.clone()), false),
+            Field::new("id", DataType::Int64, true),
+            Field::new("name", DataType::Utf8, false),
+        ]);
+        let id_array = Int64Array::from(vec![Some(1), Some(2)]);
+        let columns = vec![
+            Arc::new(Int64Array::from(vec![3, 4])) as _,
+            Arc::new(StringArray::from(vec!["aaa1", "aaa2"])) as _,
+        ];
+        let struct_array = StructArray::new(struct_fields, columns, None);
+
+        let name_array = StringArray::from(vec![Some("test01"), Some("test02")]);
+        let schema = Arc::new(schema);
+
+        let batch = RecordBatch::try_new(
+            schema.clone(),
+            vec![
+                Arc::new(struct_array),
+                Arc::new(id_array),
+                Arc::new(name_array),
+            ],
+        )
+        .unwrap();
+        let file = File::create(file).unwrap();
+        let w_opt = WriterProperties::builder().build();
+        let mut writer = ArrowWriter::try_new(file, schema, Some(w_opt)).unwrap();
+        writer.write(&batch).unwrap();
+        writer.flush().unwrap();
+        writer.close().unwrap();
+    }
 }
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs
index a0637f3796..f0a8e66089 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs
@@ -23,11 +23,12 @@ use arrow::array::{
 };
 use arrow::datatypes::DataType;
 use arrow::{array::ArrayRef, datatypes::SchemaRef, error::ArrowError};
+use arrow_schema::Schema;
 use datafusion_common::{DataFusionError, Result, ScalarValue};
 use datafusion_physical_expr::expressions::Column;
 use datafusion_physical_expr::{split_conjunction, PhysicalExpr};
 use log::{debug, trace};
-use parquet::schema::types::ColumnDescriptor;
+use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor};
 use parquet::{
     arrow::arrow_reader::{RowSelection, RowSelector},
     errors::ParquetError,
@@ -41,7 +42,9 @@ use std::collections::HashSet;
 use std::sync::Arc;
 
 use crate::datasource::physical_plan::parquet::parquet_to_arrow_decimal_type;
-use crate::datasource::physical_plan::parquet::statistics::from_bytes_to_i128;
+use crate::datasource::physical_plan::parquet::statistics::{
+    from_bytes_to_i128, parquet_column,
+};
 use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
 
 use super::metrics::ParquetFileMetrics;
@@ -128,6 +131,8 @@ impl PagePruningPredicate {
     /// Returns a [`RowSelection`] for the given file
     pub fn prune(
         &self,
+        arrow_schema: &Schema,
+        parquet_schema: &SchemaDescriptor,
         row_groups: &[usize],
         file_metadata: &ParquetMetaData,
         file_metrics: &ParquetFileMetrics,
@@ -163,9 +168,8 @@ impl PagePruningPredicate {
 
         let mut row_selections = Vec::with_capacity(page_index_predicates.len());
         for predicate in page_index_predicates {
-            // find column index by looking in the row group metadata.
-            let col_idx = find_column_index(predicate, &groups[0]);
-
+            // find column index in the parquet schema
+            let col_idx = find_column_index(predicate, arrow_schema, parquet_schema);
             let mut selectors = Vec::with_capacity(row_groups.len());
             for r in row_groups.iter() {
                 let row_group_metadata = &groups[*r];
@@ -231,7 +235,7 @@ impl PagePruningPredicate {
     }
 }
 
-/// Returns the column index in the row group metadata for the single
+/// Returns the column index in the row parquet schema for the single
 /// column of a single column pruning predicate.
 ///
 /// For example, give the predicate `y > 5`
@@ -246,12 +250,12 @@ impl PagePruningPredicate {
 /// Panics:
 ///
 /// If the predicate contains more than one column reference (assumes
-/// that `extract_page_index_push_down_predicates` only return
+/// that `extract_page_index_push_down_predicates` only returns
 /// predicate with one col)
-///
 fn find_column_index(
     predicate: &PruningPredicate,
-    row_group_metadata: &RowGroupMetaData,
+    arrow_schema: &Schema,
+    parquet_schema: &SchemaDescriptor,
 ) -> Option<usize> {
     let mut found_required_column: Option<&Column> = None;
 
@@ -269,25 +273,12 @@ fn find_column_index(
         }
     }
 
-    let column = if let Some(found_required_column) = found_required_column.as_ref() {
-        found_required_column
-    } else {
+    let Some(column) = found_required_column.as_ref() else {
         trace!("No column references in pruning predicate");
         return None;
     };
 
-    let col_idx = row_group_metadata
-        .columns()
-        .iter()
-        .enumerate()
-        .find(|(_idx, c)| c.column_descr().name() == column.name())
-        .map(|(idx, _c)| idx);
-
-    if col_idx.is_none() {
-        trace!("Can not find column {} in row group meta", column.name());
-    }
-
-    col_idx
+    parquet_column(parquet_schema, arrow_schema, column.name()).map(|x| x.0)
 }
 
 /// Intersects the [`RowSelector`]s