You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2024/01/24 18:37:25 UTC
(arrow-datafusion) branch main updated: Find the correct fields when using page filter on `struct` fields in parquet (#8848)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 7ad929a117 Find the correct fields when using page filter on `struct` fields in parquet (#8848)
7ad929a117 is described below
commit 7ad929a117342b425631138693826b57c5346ac6
Author: manoj-inukolunu <54...@users.noreply.github.com>
AuthorDate: Thu Jan 25 00:07:17 2024 +0530
Find the correct fields when using page filter on `struct` fields in parquet (#8848)
* Dont consider struct fields for filtering in parquet
* use parquet_column instead of find_column_index.
* Remove unused struct
* Fix formatting issues.
* Simplify struct field resolution
* fix formatting
* fmt
---------
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
.../src/datasource/physical_plan/parquet/mod.rs | 80 ++++++++++++++++++++--
.../physical_plan/parquet/page_filter.rs | 39 ++++-------
2 files changed, 90 insertions(+), 29 deletions(-)
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index c2689cfb10..7215cdd607 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -549,8 +549,13 @@ impl FileOpener for ParquetOpener {
// with that range can be skipped as well
if enable_page_index && !row_groups.is_empty() {
if let Some(p) = page_pruning_predicate {
- let pruned =
- p.prune(&row_groups, file_metadata.as_ref(), &file_metrics)?;
+ let pruned = p.prune(
+ &file_schema,
+ builder.parquet_schema(),
+ &row_groups,
+ file_metadata.as_ref(),
+ &file_metrics,
+ )?;
if let Some(row_selection) = pruned {
builder = builder.with_row_selection(row_selection);
}
@@ -782,7 +787,8 @@ mod tests {
array::{Int64Array, Int8Array, StringArray},
datatypes::{DataType, Field, SchemaBuilder},
};
- use arrow_array::Date64Array;
+ use arrow_array::{Date64Array, StructArray};
+ use arrow_schema::Fields;
use chrono::{TimeZone, Utc};
use datafusion_common::{assert_contains, ToDFSchema};
use datafusion_common::{FileType, GetExt, ScalarValue};
@@ -793,6 +799,7 @@ mod tests {
use object_store::local::LocalFileSystem;
use object_store::path::Path;
use object_store::ObjectMeta;
+ use parquet::arrow::ArrowWriter;
use std::fs::{self, File};
use std::io::Write;
use tempfile::TempDir;
@@ -1765,12 +1772,14 @@ mod tests {
// assert the batches and some metrics
#[rustfmt::skip]
- let expected = ["+-----+",
+ let expected = [
+ "+-----+",
"| int |",
"+-----+",
"| 4 |",
"| 5 |",
- "+-----+"];
+ "+-----+"
+ ];
assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
assert_eq!(get_value(&metrics, "page_index_rows_filtered"), 4);
assert!(
@@ -2136,4 +2145,65 @@ mod tests {
let execution_props = ExecutionProps::new();
create_physical_expr(expr, &df_schema, &execution_props).unwrap()
}
+
+ #[tokio::test]
+ async fn test_struct_filter_parquet() -> Result<()> {
+ let tmp_dir = TempDir::new()?;
+ let path = tmp_dir.path().to_str().unwrap().to_string() + "/test.parquet";
+ write_file(&path);
+ let ctx = SessionContext::new();
+ let opt = ListingOptions::new(Arc::new(ParquetFormat::default()));
+ ctx.register_listing_table("base_table", path, opt, None, None)
+ .await
+ .unwrap();
+ let sql = "select * from base_table where name='test02'";
+ let batch = ctx.sql(sql).await.unwrap().collect().await.unwrap();
+ assert_eq!(batch.len(), 1);
+ let expected = [
+ "+---------------------+----+--------+",
+ "| struct | id | name |",
+ "+---------------------+----+--------+",
+ "| {id: 4, name: aaa2} | 2 | test02 |",
+ "+---------------------+----+--------+",
+ ];
+ crate::assert_batches_eq!(expected, &batch);
+ Ok(())
+ }
+
+ fn write_file(file: &String) {
+ let struct_fields = Fields::from(vec![
+ Field::new("id", DataType::Int64, false),
+ Field::new("name", DataType::Utf8, false),
+ ]);
+ let schema = Schema::new(vec![
+ Field::new("struct", DataType::Struct(struct_fields.clone()), false),
+ Field::new("id", DataType::Int64, true),
+ Field::new("name", DataType::Utf8, false),
+ ]);
+ let id_array = Int64Array::from(vec![Some(1), Some(2)]);
+ let columns = vec![
+ Arc::new(Int64Array::from(vec![3, 4])) as _,
+ Arc::new(StringArray::from(vec!["aaa1", "aaa2"])) as _,
+ ];
+ let struct_array = StructArray::new(struct_fields, columns, None);
+
+ let name_array = StringArray::from(vec![Some("test01"), Some("test02")]);
+ let schema = Arc::new(schema);
+
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(struct_array),
+ Arc::new(id_array),
+ Arc::new(name_array),
+ ],
+ )
+ .unwrap();
+ let file = File::create(file).unwrap();
+ let w_opt = WriterProperties::builder().build();
+ let mut writer = ArrowWriter::try_new(file, schema, Some(w_opt)).unwrap();
+ writer.write(&batch).unwrap();
+ writer.flush().unwrap();
+ writer.close().unwrap();
+ }
}
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs
index a0637f3796..f0a8e66089 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs
@@ -23,11 +23,12 @@ use arrow::array::{
};
use arrow::datatypes::DataType;
use arrow::{array::ArrayRef, datatypes::SchemaRef, error::ArrowError};
+use arrow_schema::Schema;
use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::{split_conjunction, PhysicalExpr};
use log::{debug, trace};
-use parquet::schema::types::ColumnDescriptor;
+use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor};
use parquet::{
arrow::arrow_reader::{RowSelection, RowSelector},
errors::ParquetError,
@@ -41,7 +42,9 @@ use std::collections::HashSet;
use std::sync::Arc;
use crate::datasource::physical_plan::parquet::parquet_to_arrow_decimal_type;
-use crate::datasource::physical_plan::parquet::statistics::from_bytes_to_i128;
+use crate::datasource::physical_plan::parquet::statistics::{
+ from_bytes_to_i128, parquet_column,
+};
use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use super::metrics::ParquetFileMetrics;
@@ -128,6 +131,8 @@ impl PagePruningPredicate {
/// Returns a [`RowSelection`] for the given file
pub fn prune(
&self,
+ arrow_schema: &Schema,
+ parquet_schema: &SchemaDescriptor,
row_groups: &[usize],
file_metadata: &ParquetMetaData,
file_metrics: &ParquetFileMetrics,
@@ -163,9 +168,8 @@ impl PagePruningPredicate {
let mut row_selections = Vec::with_capacity(page_index_predicates.len());
for predicate in page_index_predicates {
- // find column index by looking in the row group metadata.
- let col_idx = find_column_index(predicate, &groups[0]);
-
+ // find column index in the parquet schema
+ let col_idx = find_column_index(predicate, arrow_schema, parquet_schema);
let mut selectors = Vec::with_capacity(row_groups.len());
for r in row_groups.iter() {
let row_group_metadata = &groups[*r];
@@ -231,7 +235,7 @@ impl PagePruningPredicate {
}
}
-/// Returns the column index in the row group metadata for the single
+/// Returns the column index in the row parquet schema for the single
/// column of a single column pruning predicate.
///
/// For example, give the predicate `y > 5`
@@ -246,12 +250,12 @@ impl PagePruningPredicate {
/// Panics:
///
/// If the predicate contains more than one column reference (assumes
-/// that `extract_page_index_push_down_predicates` only return
+/// that `extract_page_index_push_down_predicates` only returns
/// predicate with one col)
-///
fn find_column_index(
predicate: &PruningPredicate,
- row_group_metadata: &RowGroupMetaData,
+ arrow_schema: &Schema,
+ parquet_schema: &SchemaDescriptor,
) -> Option<usize> {
let mut found_required_column: Option<&Column> = None;
@@ -269,25 +273,12 @@ fn find_column_index(
}
}
- let column = if let Some(found_required_column) = found_required_column.as_ref() {
- found_required_column
- } else {
+ let Some(column) = found_required_column.as_ref() else {
trace!("No column references in pruning predicate");
return None;
};
- let col_idx = row_group_metadata
- .columns()
- .iter()
- .enumerate()
- .find(|(_idx, c)| c.column_descr().name() == column.name())
- .map(|(idx, _c)| idx);
-
- if col_idx.is_none() {
- trace!("Can not find column {} in row group meta", column.name());
- }
-
- col_idx
+ parquet_column(parquet_schema, arrow_schema, column.name()).map(|x| x.0)
}
/// Intersects the [`RowSelector`]s