You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/10 12:33:39 UTC
[arrow-datafusion] branch master updated: Minor: Extract parquet row group pruning code into its own module (#4160)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 4d621c5c0 Minor: Extract parquet row group pruning code into its own module (#4160)
4d621c5c0 is described below
commit 4d621c5c0da387f50c9ab6f3a9ee568062ee9976
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Thu Nov 10 07:33:34 2022 -0500
Minor: Extract parquet row group pruning code into its own module (#4160)
---
.../core/src/physical_plan/file_format/parquet.rs | 729 +-------------------
.../file_format/parquet/row_groups.rs | 744 +++++++++++++++++++++
2 files changed, 755 insertions(+), 718 deletions(-)
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 36c8392db..cf8763b07 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -17,18 +17,18 @@
//! Execution plan for reading Parquet files
+use arrow::datatypes::SchemaRef;
use fmt::Debug;
+use std::any::Any;
use std::fmt;
use std::fs;
use std::ops::Range;
use std::sync::Arc;
-use std::{any::Any, convert::TryInto};
use crate::config::OPT_PARQUET_ENABLE_PAGE_INDEX;
use crate::config::OPT_PARQUET_PUSHDOWN_FILTERS;
use crate::config::OPT_PARQUET_REORDER_FILTERS;
use crate::datasource::file_format::parquet::fetch_parquet_metadata;
-use crate::datasource::listing::FileRange;
use crate::physical_plan::file_format::file_stream::{
FileOpenFuture, FileOpener, FileStream,
};
@@ -36,7 +36,7 @@ use crate::physical_plan::file_format::FileMeta;
use crate::{
error::{DataFusionError, Result},
execution::context::{SessionState, TaskContext},
- physical_optimizer::pruning::{PruningPredicate, PruningStatistics},
+ physical_optimizer::pruning::PruningPredicate,
physical_plan::{
expressions::PhysicalSortExpr,
file_format::{FileScanConfig, SchemaAdapter},
@@ -44,16 +44,9 @@ use crate::{
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
Statistics,
},
- scalar::ScalarValue,
-};
-use arrow::datatypes::DataType;
-use arrow::{
- array::ArrayRef,
- datatypes::{Schema, SchemaRef},
- error::ArrowError,
};
+use arrow::error::ArrowError;
use bytes::Bytes;
-use datafusion_common::Column;
use datafusion_expr::Expr;
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
@@ -62,18 +55,13 @@ use object_store::{ObjectMeta, ObjectStore};
use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask};
-use parquet::basic::{ConvertedType, LogicalType};
use parquet::errors::ParquetError;
-use parquet::file::{
- metadata::{ParquetMetaData, RowGroupMetaData},
- properties::WriterProperties,
- statistics::Statistics as ParquetStatistics,
-};
-use parquet::schema::types::ColumnDescriptor;
+use parquet::file::{metadata::ParquetMetaData, properties::WriterProperties};
mod metrics;
mod page_filter;
mod row_filter;
+mod row_groups;
pub use metrics::ParquetFileMetrics;
@@ -434,7 +422,7 @@ impl FileOpener for ParquetOpener {
// Row group pruning: attempt to skip entire row_groups
// using metadata on the row groups
let file_metadata = builder.metadata();
- let row_groups = prune_row_groups(
+ let row_groups = row_groups::prune_row_groups(
file_metadata.row_groups(),
file_range,
pruning_predicate.clone(),
@@ -597,224 +585,6 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
}
}
-/// Wraps parquet statistics in a way
-/// that implements [`PruningStatistics`]
-struct RowGroupPruningStatistics<'a> {
- row_group_metadata: &'a RowGroupMetaData,
- parquet_schema: &'a Schema,
-}
-
-// TODO: consolidate code with arrow-rs
-// Convert the bytes array to i128.
-// The endian of the input bytes array must be big-endian.
-// Copy from the arrow-rs
-fn from_bytes_to_i128(b: &[u8]) -> i128 {
- assert!(b.len() <= 16, "Decimal128Array supports only up to size 16");
- let first_bit = b[0] & 128u8 == 128u8;
- let mut result = if first_bit { [255u8; 16] } else { [0u8; 16] };
- for (i, v) in b.iter().enumerate() {
- result[i + (16 - b.len())] = *v;
- }
- // The bytes array are from parquet file and must be the big-endian.
- // The endian is defined by parquet format, and the reference document
- // https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66
- i128::from_be_bytes(result)
-}
-
-/// Extract the min/max statistics from a `ParquetStatistics` object
-macro_rules! get_statistic {
- ($column_statistics:expr, $func:ident, $bytes_func:ident, $target_arrow_type:expr) => {{
- if !$column_statistics.has_min_max_set() {
- return None;
- }
- match $column_statistics {
- ParquetStatistics::Boolean(s) => Some(ScalarValue::Boolean(Some(*s.$func()))),
- ParquetStatistics::Int32(s) => {
- match $target_arrow_type {
- // int32 to decimal with the precision and scale
- Some(DataType::Decimal128(precision, scale)) => {
- Some(ScalarValue::Decimal128(
- Some(*s.$func() as i128),
- precision,
- scale,
- ))
- }
- _ => Some(ScalarValue::Int32(Some(*s.$func()))),
- }
- }
- ParquetStatistics::Int64(s) => {
- match $target_arrow_type {
- // int64 to decimal with the precision and scale
- Some(DataType::Decimal128(precision, scale)) => {
- Some(ScalarValue::Decimal128(
- Some(*s.$func() as i128),
- precision,
- scale,
- ))
- }
- _ => Some(ScalarValue::Int64(Some(*s.$func()))),
- }
- }
- // 96 bit ints not supported
- ParquetStatistics::Int96(_) => None,
- ParquetStatistics::Float(s) => Some(ScalarValue::Float32(Some(*s.$func()))),
- ParquetStatistics::Double(s) => Some(ScalarValue::Float64(Some(*s.$func()))),
- ParquetStatistics::ByteArray(s) => {
- // TODO support decimal type for byte array type
- let s = std::str::from_utf8(s.$bytes_func())
- .map(|s| s.to_string())
- .ok();
- Some(ScalarValue::Utf8(s))
- }
- // type not supported yet
- ParquetStatistics::FixedLenByteArray(s) => {
- match $target_arrow_type {
- // just support the decimal data type
- Some(DataType::Decimal128(precision, scale)) => {
- Some(ScalarValue::Decimal128(
- Some(from_bytes_to_i128(s.$bytes_func())),
- precision,
- scale,
- ))
- }
- _ => None,
- }
- }
- }
- }};
-}
-
-// Extract the min or max value calling `func` or `bytes_func` on the ParquetStatistics as appropriate
-macro_rules! get_min_max_values {
- ($self:expr, $column:expr, $func:ident, $bytes_func:ident) => {{
- let (_column_index, field) =
- if let Some((v, f)) = $self.parquet_schema.column_with_name(&$column.name) {
- (v, f)
- } else {
- // Named column was not present
- return None;
- };
-
- let data_type = field.data_type();
- // The result may be None, because DataFusion doesn't have support for ScalarValues of the column type
- let null_scalar: ScalarValue = data_type.try_into().ok()?;
-
- $self.row_group_metadata
- .columns()
- .iter()
- .find(|c| c.column_descr().name() == &$column.name)
- .and_then(|c| if c.statistics().is_some() {Some((c.statistics().unwrap(), c.column_descr()))} else {None})
- .map(|(stats, column_descr)|
- {
- let target_data_type = parquet_to_arrow_decimal_type(column_descr);
- get_statistic!(stats, $func, $bytes_func, target_data_type)
- })
- .flatten()
- // column either didn't have statistics at all or didn't have min/max values
- .or_else(|| Some(null_scalar.clone()))
- .map(|s| s.to_array())
- }}
-}
-
-// Extract the null count value on the ParquetStatistics
-macro_rules! get_null_count_values {
- ($self:expr, $column:expr) => {{
- let value = ScalarValue::UInt64(
- if let Some(col) = $self
- .row_group_metadata
- .columns()
- .iter()
- .find(|c| c.column_descr().name() == &$column.name)
- {
- col.statistics().map(|s| s.null_count())
- } else {
- Some($self.row_group_metadata.num_rows() as u64)
- },
- );
-
- Some(value.to_array())
- }};
-}
-
-// Convert parquet column schema to arrow data type, and just consider the
-// decimal data type.
-fn parquet_to_arrow_decimal_type(parquet_column: &ColumnDescriptor) -> Option<DataType> {
- let type_ptr = parquet_column.self_type_ptr();
- match type_ptr.get_basic_info().logical_type() {
- Some(LogicalType::Decimal { scale, precision }) => {
- Some(DataType::Decimal128(precision as u8, scale as u8))
- }
- _ => match type_ptr.get_basic_info().converted_type() {
- ConvertedType::DECIMAL => Some(DataType::Decimal128(
- type_ptr.get_precision() as u8,
- type_ptr.get_scale() as u8,
- )),
- _ => None,
- },
- }
-}
-
-impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
- fn min_values(&self, column: &Column) -> Option<ArrayRef> {
- get_min_max_values!(self, column, min, min_bytes)
- }
-
- fn max_values(&self, column: &Column) -> Option<ArrayRef> {
- get_min_max_values!(self, column, max, max_bytes)
- }
-
- fn num_containers(&self) -> usize {
- 1
- }
-
- fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
- get_null_count_values!(self, column)
- }
-}
-
-fn prune_row_groups(
- groups: &[RowGroupMetaData],
- range: Option<FileRange>,
- predicate: Option<PruningPredicate>,
- metrics: &ParquetFileMetrics,
-) -> Vec<usize> {
- // TODO: Columnar pruning
- let mut filtered = Vec::with_capacity(groups.len());
- for (idx, metadata) in groups.iter().enumerate() {
- if let Some(range) = &range {
- let offset = metadata.column(0).file_offset();
- if offset < range.start || offset >= range.end {
- continue;
- }
- }
-
- if let Some(predicate) = &predicate {
- let pruning_stats = RowGroupPruningStatistics {
- row_group_metadata: metadata,
- parquet_schema: predicate.schema().as_ref(),
- };
- match predicate.prune(&pruning_stats) {
- Ok(values) => {
- // NB: false means don't scan row group
- if !values[0] {
- metrics.row_groups_pruned.add(1);
- continue;
- }
- }
- // stats filter array could not be built
- // return a closure which will not filter out any row groups
- Err(e) => {
- debug!("Error evaluating row group predicate values {}", e);
- metrics.predicate_evaluation_errors.add(1);
- }
- }
- }
-
- filtered.push(idx)
- }
- filtered
-}
-
/// Executes a query and writes the results to a partitioned Parquet file.
pub async fn plan_to_parquet(
state: &SessionState,
@@ -881,8 +651,8 @@ mod tests {
datasource::file_format::{parquet::ParquetFormat, FileFormat},
physical_plan::collect,
};
- use arrow::array::{Float32Array, Int32Array};
- use arrow::datatypes::DataType::Decimal128;
+ use arrow::array::{ArrayRef, Float32Array, Int32Array};
+ use arrow::datatypes::Schema;
use arrow::record_batch::RecordBatch;
use arrow::{
array::{Int64Array, Int8Array, StringArray},
@@ -890,18 +660,12 @@ mod tests {
};
use chrono::{TimeZone, Utc};
use datafusion_common::assert_contains;
- use datafusion_expr::{cast, col, lit};
+ use datafusion_common::ScalarValue;
+ use datafusion_expr::{col, lit};
use futures::StreamExt;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
use object_store::ObjectMeta;
- use parquet::basic::LogicalType;
- use parquet::data_type::{ByteArray, FixedLenByteArray};
- use parquet::{
- basic::Type as PhysicalType,
- file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics},
- schema::types::SchemaDescPtr,
- };
use std::fs::File;
use std::io::Write;
use tempfile::TempDir;
@@ -1756,477 +1520,6 @@ mod tests {
}
}
- fn parquet_file_metrics() -> ParquetFileMetrics {
- let metrics = Arc::new(ExecutionPlanMetricsSet::new());
- ParquetFileMetrics::new(0, "file.parquet", &metrics)
- }
-
- #[test]
- fn row_group_pruning_predicate_simple_expr() {
- use datafusion_expr::{col, lit};
- // int > 1 => c1_max > 1
- let expr = col("c1").gt(lit(15));
- let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
- let pruning_predicate =
- PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
- let schema_descr = get_test_schema_descr(vec![(
- "c1",
- PhysicalType::INT32,
- None,
- None,
- None,
- None,
- )]);
- let rgm1 = get_row_group_meta_data(
- &schema_descr,
- vec![ParquetStatistics::int32(Some(1), Some(10), None, 0, false)],
- );
- let rgm2 = get_row_group_meta_data(
- &schema_descr,
- vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)],
- );
-
- let metrics = parquet_file_metrics();
- assert_eq!(
- prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
- vec![1]
- );
- }
-
- #[test]
- fn row_group_pruning_predicate_missing_stats() {
- use datafusion_expr::{col, lit};
- // int > 1 => c1_max > 1
- let expr = col("c1").gt(lit(15));
- let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
- let pruning_predicate =
- PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
-
- let schema_descr = get_test_schema_descr(vec![(
- "c1",
- PhysicalType::INT32,
- None,
- None,
- None,
- None,
- )]);
- let rgm1 = get_row_group_meta_data(
- &schema_descr,
- vec![ParquetStatistics::int32(None, None, None, 0, false)],
- );
- let rgm2 = get_row_group_meta_data(
- &schema_descr,
- vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)],
- );
- let metrics = parquet_file_metrics();
- // missing statistics for first row group mean that the result from the predicate expression
- // is null / undefined so the first row group can't be filtered out
- assert_eq!(
- prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
- vec![0, 1]
- );
- }
-
- #[test]
- fn row_group_pruning_predicate_partial_expr() {
- use datafusion_expr::{col, lit};
- // test row group predicate with partially supported expression
- // int > 1 and int % 2 => c1_max > 1 and true
- let expr = col("c1").gt(lit(15)).and(col("c2").modulus(lit(2)));
- let schema = Arc::new(Schema::new(vec![
- Field::new("c1", DataType::Int32, false),
- Field::new("c2", DataType::Int32, false),
- ]));
- let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
-
- let schema_descr = get_test_schema_descr(vec![
- ("c1", PhysicalType::INT32, None, None, None, None),
- ("c2", PhysicalType::INT32, None, None, None, None),
- ]);
- let rgm1 = get_row_group_meta_data(
- &schema_descr,
- vec![
- ParquetStatistics::int32(Some(1), Some(10), None, 0, false),
- ParquetStatistics::int32(Some(1), Some(10), None, 0, false),
- ],
- );
- let rgm2 = get_row_group_meta_data(
- &schema_descr,
- vec![
- ParquetStatistics::int32(Some(11), Some(20), None, 0, false),
- ParquetStatistics::int32(Some(11), Some(20), None, 0, false),
- ],
- );
-
- let metrics = parquet_file_metrics();
- let groups = &[rgm1, rgm2];
- // the first row group is still filtered out because the predicate expression can be partially evaluated
- // when conditions are joined using AND
- assert_eq!(
- prune_row_groups(groups, None, Some(pruning_predicate), &metrics),
- vec![1]
- );
-
- // if conditions in predicate are joined with OR and an unsupported expression is used
- // this bypasses the entire predicate expression and no row groups are filtered out
- let expr = col("c1").gt(lit(15)).or(col("c2").modulus(lit(2)));
- let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
-
- // if conditions in predicate are joined with OR and an unsupported expression is used
- // this bypasses the entire predicate expression and no row groups are filtered out
- assert_eq!(
- prune_row_groups(groups, None, Some(pruning_predicate), &metrics),
- vec![0, 1]
- );
- }
-
- fn gen_row_group_meta_data_for_pruning_predicate() -> Vec<RowGroupMetaData> {
- let schema_descr = get_test_schema_descr(vec![
- ("c1", PhysicalType::INT32, None, None, None, None),
- ("c2", PhysicalType::BOOLEAN, None, None, None, None),
- ]);
- let rgm1 = get_row_group_meta_data(
- &schema_descr,
- vec![
- ParquetStatistics::int32(Some(1), Some(10), None, 0, false),
- ParquetStatistics::boolean(Some(false), Some(true), None, 0, false),
- ],
- );
- let rgm2 = get_row_group_meta_data(
- &schema_descr,
- vec![
- ParquetStatistics::int32(Some(11), Some(20), None, 0, false),
- ParquetStatistics::boolean(Some(false), Some(true), None, 1, false),
- ],
- );
- vec![rgm1, rgm2]
- }
-
- #[test]
- fn row_group_pruning_predicate_null_expr() {
- use datafusion_expr::{col, lit};
- // int > 1 and IsNull(bool) => c1_max > 1 and bool_null_count > 0
- let expr = col("c1").gt(lit(15)).and(col("c2").is_null());
- let schema = Arc::new(Schema::new(vec![
- Field::new("c1", DataType::Int32, false),
- Field::new("c2", DataType::Boolean, false),
- ]));
- let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
- let groups = gen_row_group_meta_data_for_pruning_predicate();
-
- let metrics = parquet_file_metrics();
- // First row group was filtered out because it contains no null value on "c2".
- assert_eq!(
- prune_row_groups(&groups, None, Some(pruning_predicate), &metrics),
- vec![1]
- );
- }
-
- #[test]
- fn row_group_pruning_predicate_eq_null_expr() {
- use datafusion_expr::{col, lit};
- // test row group predicate with an unknown (Null) expr
- //
- // int > 1 and bool = NULL => c1_max > 1 and null
- let expr = col("c1")
- .gt(lit(15))
- .and(col("c2").eq(lit(ScalarValue::Boolean(None))));
- let schema = Arc::new(Schema::new(vec![
- Field::new("c1", DataType::Int32, false),
- Field::new("c2", DataType::Boolean, false),
- ]));
- let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
- let groups = gen_row_group_meta_data_for_pruning_predicate();
-
- let metrics = parquet_file_metrics();
- // bool = NULL always evaluates to NULL (and thus will not
- // pass predicates. Ideally these should both be false
- assert_eq!(
- prune_row_groups(&groups, None, Some(pruning_predicate), &metrics),
- vec![1]
- );
- }
-
- #[test]
- fn row_group_pruning_predicate_decimal_type() {
- // For the decimal data type, parquet can use `INT32`, `INT64`, `BYTE_ARRAY`, `FIXED_LENGTH_BYTE_ARRAY` to
- // store the data.
- // In this case, construct four types of statistics to filtered with the decimal predication.
-
- // INT32: c1 > 5, the c1 is decimal(9,2)
- // The type of scalar value if decimal(9,2), don't need to do cast
- let expr = col("c1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2)));
- let schema =
- Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 2), false)]);
- let schema_descr = get_test_schema_descr(vec![(
- "c1",
- PhysicalType::INT32,
- Some(LogicalType::Decimal {
- scale: 2,
- precision: 9,
- }),
- Some(9),
- Some(2),
- None,
- )]);
- let pruning_predicate =
- PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
- let rgm1 = get_row_group_meta_data(
- &schema_descr,
- // [1.00, 6.00]
- // c1 > 5, this row group will be included in the results.
- vec![ParquetStatistics::int32(
- Some(100),
- Some(600),
- None,
- 0,
- false,
- )],
- );
- let rgm2 = get_row_group_meta_data(
- &schema_descr,
- // [0.1, 0.2]
- // c1 > 5, this row group will not be included in the results.
- vec![ParquetStatistics::int32(Some(10), Some(20), None, 0, false)],
- );
- let metrics = parquet_file_metrics();
- assert_eq!(
- prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
- vec![0]
- );
-
- // INT32: c1 > 5, but parquet decimal type has different precision or scale to arrow decimal
- // The c1 type is decimal(9,0) in the parquet file, and the type of scalar is decimal(5,2).
- // We should convert all type to the coercion type, which is decimal(11,2)
- // The decimal of arrow is decimal(5,2), the decimal of parquet is decimal(9,0)
- let expr = cast(col("c1"), DataType::Decimal128(11, 2)).gt(cast(
- lit(ScalarValue::Decimal128(Some(500), 5, 2)),
- Decimal128(11, 2),
- ));
- let schema =
- Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 0), false)]);
- let schema_descr = get_test_schema_descr(vec![(
- "c1",
- PhysicalType::INT32,
- Some(LogicalType::Decimal {
- scale: 0,
- precision: 9,
- }),
- Some(9),
- Some(0),
- None,
- )]);
- let pruning_predicate =
- PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
- let rgm1 = get_row_group_meta_data(
- &schema_descr,
- // [100, 600]
- // c1 > 5, this row group will be included in the results.
- vec![ParquetStatistics::int32(
- Some(100),
- Some(600),
- None,
- 0,
- false,
- )],
- );
- let rgm2 = get_row_group_meta_data(
- &schema_descr,
- // [10, 20]
- // c1 > 5, this row group will be included in the results.
- vec![ParquetStatistics::int32(Some(10), Some(20), None, 0, false)],
- );
- let rgm3 = get_row_group_meta_data(
- &schema_descr,
- // [0, 2]
- // c1 > 5, this row group will not be included in the results.
- vec![ParquetStatistics::int32(Some(0), Some(2), None, 0, false)],
- );
- let metrics = parquet_file_metrics();
- assert_eq!(
- prune_row_groups(
- &[rgm1, rgm2, rgm3],
- None,
- Some(pruning_predicate),
- &metrics
- ),
- vec![0, 1]
- );
-
- // INT64: c1 < 5, the c1 is decimal(18,2)
- let expr = col("c1").lt(lit(ScalarValue::Decimal128(Some(500), 18, 2)));
- let schema =
- Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
- let schema_descr = get_test_schema_descr(vec![(
- "c1",
- PhysicalType::INT64,
- Some(LogicalType::Decimal {
- scale: 2,
- precision: 18,
- }),
- Some(18),
- Some(2),
- None,
- )]);
- let pruning_predicate =
- PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
- let rgm1 = get_row_group_meta_data(
- &schema_descr,
- // [6.00, 8.00]
- vec![ParquetStatistics::int32(
- Some(600),
- Some(800),
- None,
- 0,
- false,
- )],
- );
- let rgm2 = get_row_group_meta_data(
- &schema_descr,
- // [0.1, 0.2]
- vec![ParquetStatistics::int64(Some(10), Some(20), None, 0, false)],
- );
- let metrics = parquet_file_metrics();
- assert_eq!(
- prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
- vec![1]
- );
-
- // FIXED_LENGTH_BYTE_ARRAY: c1 = decimal128(100000, 28, 3), the c1 is decimal(18,2)
- // the type of parquet is decimal(18,2)
- let schema =
- Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
- // cast the type of c1 to decimal(28,3)
- let left = cast(col("c1"), DataType::Decimal128(28, 3));
- let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
- let schema_descr = get_test_schema_descr(vec![(
- "c1",
- PhysicalType::FIXED_LEN_BYTE_ARRAY,
- Some(LogicalType::Decimal {
- scale: 2,
- precision: 18,
- }),
- Some(18),
- Some(2),
- Some(16),
- )]);
- let pruning_predicate =
- PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
- // we must use the big-endian when encode the i128 to bytes or vec[u8].
- let rgm1 = get_row_group_meta_data(
- &schema_descr,
- vec![ParquetStatistics::fixed_len_byte_array(
- // 5.00
- Some(FixedLenByteArray::from(ByteArray::from(
- 500i128.to_be_bytes().to_vec(),
- ))),
- // 80.00
- Some(FixedLenByteArray::from(ByteArray::from(
- 8000i128.to_be_bytes().to_vec(),
- ))),
- None,
- 0,
- false,
- )],
- );
- let rgm2 = get_row_group_meta_data(
- &schema_descr,
- vec![ParquetStatistics::fixed_len_byte_array(
- // 5.00
- Some(FixedLenByteArray::from(ByteArray::from(
- 500i128.to_be_bytes().to_vec(),
- ))),
- // 200.00
- Some(FixedLenByteArray::from(ByteArray::from(
- 20000i128.to_be_bytes().to_vec(),
- ))),
- None,
- 0,
- false,
- )],
- );
- let metrics = parquet_file_metrics();
- assert_eq!(
- prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
- vec![1]
- );
-
- // TODO: BYTE_ARRAY support read decimal from parquet, after the 20.0.0 arrow-rs release
- }
-
- fn get_row_group_meta_data(
- schema_descr: &SchemaDescPtr,
- column_statistics: Vec<ParquetStatistics>,
- ) -> RowGroupMetaData {
- use parquet::file::metadata::ColumnChunkMetaData;
- let mut columns = vec![];
- for (i, s) in column_statistics.iter().enumerate() {
- let column = ColumnChunkMetaData::builder(schema_descr.column(i))
- .set_statistics(s.clone())
- .build()
- .unwrap();
- columns.push(column);
- }
- RowGroupMetaData::builder(schema_descr.clone())
- .set_num_rows(1000)
- .set_total_byte_size(2000)
- .set_column_metadata(columns)
- .build()
- .unwrap()
- }
-
- #[allow(clippy::type_complexity)]
- fn get_test_schema_descr(
- fields: Vec<(
- &str,
- PhysicalType,
- Option<LogicalType>,
- Option<i32>, // precision
- Option<i32>, // scale
- Option<i32>, // length of bytes
- )>,
- ) -> SchemaDescPtr {
- use parquet::schema::types::{SchemaDescriptor, Type as SchemaType};
- let mut schema_fields = fields
- .iter()
- .map(|(n, t, logical, precision, scale, length)| {
- let mut builder = SchemaType::primitive_type_builder(n, *t);
- // add logical type for the parquet field
- match logical {
- None => {}
- Some(logical_type) => {
- builder = builder.with_logical_type(Some(logical_type.clone()));
- }
- };
- match precision {
- None => {}
- Some(v) => {
- builder = builder.with_precision(*v);
- }
- };
- match scale {
- None => {}
- Some(v) => {
- builder = builder.with_scale(*v);
- }
- }
- match length {
- None => {}
- Some(v) => {
- builder = builder.with_length(*v);
- }
- }
- Arc::new(builder.build().unwrap())
- })
- .collect::<Vec<_>>();
- let schema = SchemaType::group_type_builder("schema")
- .with_fields(&mut schema_fields)
- .build()
- .unwrap();
-
- Arc::new(SchemaDescriptor::new(Arc::new(schema)))
- }
-
fn populate_csv_partitions(
tmp_dir: &TempDir,
partition_count: usize,
diff --git a/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs b/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs
new file mode 100644
index 000000000..d7cbb1984
--- /dev/null
+++ b/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs
@@ -0,0 +1,744 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::{
+ array::ArrayRef,
+ datatypes::{DataType, Schema},
+};
+use datafusion_common::Column;
+use datafusion_common::ScalarValue;
+use log::debug;
+
+use parquet::{
+ file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics},
+ schema::types::ColumnDescriptor,
+};
+
+use crate::{
+ datasource::listing::FileRange,
+ physical_optimizer::pruning::{PruningPredicate, PruningStatistics},
+};
+use parquet::basic::{ConvertedType, LogicalType};
+
+use super::ParquetFileMetrics;
+
+pub(crate) fn prune_row_groups(
+ groups: &[RowGroupMetaData],
+ range: Option<FileRange>,
+ predicate: Option<PruningPredicate>,
+ metrics: &ParquetFileMetrics,
+) -> Vec<usize> {
+ // TODO: Columnar pruning
+ let mut filtered = Vec::with_capacity(groups.len());
+ for (idx, metadata) in groups.iter().enumerate() {
+ if let Some(range) = &range {
+ let offset = metadata.column(0).file_offset();
+ if offset < range.start || offset >= range.end {
+ continue;
+ }
+ }
+
+ if let Some(predicate) = &predicate {
+ let pruning_stats = RowGroupPruningStatistics {
+ row_group_metadata: metadata,
+ parquet_schema: predicate.schema().as_ref(),
+ };
+ match predicate.prune(&pruning_stats) {
+ Ok(values) => {
+ // NB: false means don't scan row group
+ if !values[0] {
+ metrics.row_groups_pruned.add(1);
+ continue;
+ }
+ }
+ // stats filter array could not be built
+ // return a closure which will not filter out any row groups
+ Err(e) => {
+ debug!("Error evaluating row group predicate values {}", e);
+ metrics.predicate_evaluation_errors.add(1);
+ }
+ }
+ }
+
+ filtered.push(idx)
+ }
+ filtered
+}
+
+/// Wraps parquet statistics in a way
+/// that implements [`PruningStatistics`]
+struct RowGroupPruningStatistics<'a> {
+ row_group_metadata: &'a RowGroupMetaData,
+ parquet_schema: &'a Schema,
+}
+
+// TODO: consolidate code with arrow-rs
+// Convert the bytes array to i128.
+// The endian of the input bytes array must be big-endian.
+// Copy from the arrow-rs
+fn from_bytes_to_i128(b: &[u8]) -> i128 {
+ assert!(b.len() <= 16, "Decimal128Array supports only up to size 16");
+ let first_bit = b[0] & 128u8 == 128u8;
+ let mut result = if first_bit { [255u8; 16] } else { [0u8; 16] };
+ for (i, v) in b.iter().enumerate() {
+ result[i + (16 - b.len())] = *v;
+ }
+ // The bytes array are from parquet file and must be the big-endian.
+ // The endian is defined by parquet format, and the reference document
+ // https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66
+ i128::from_be_bytes(result)
+}
+
+/// Extract the min/max statistics from a `ParquetStatistics` object
+macro_rules! get_statistic {
+ ($column_statistics:expr, $func:ident, $bytes_func:ident, $target_arrow_type:expr) => {{
+ if !$column_statistics.has_min_max_set() {
+ return None;
+ }
+ match $column_statistics {
+ ParquetStatistics::Boolean(s) => Some(ScalarValue::Boolean(Some(*s.$func()))),
+ ParquetStatistics::Int32(s) => {
+ match $target_arrow_type {
+ // int32 to decimal with the precision and scale
+ Some(DataType::Decimal128(precision, scale)) => {
+ Some(ScalarValue::Decimal128(
+ Some(*s.$func() as i128),
+ precision,
+ scale,
+ ))
+ }
+ _ => Some(ScalarValue::Int32(Some(*s.$func()))),
+ }
+ }
+ ParquetStatistics::Int64(s) => {
+ match $target_arrow_type {
+ // int64 to decimal with the precision and scale
+ Some(DataType::Decimal128(precision, scale)) => {
+ Some(ScalarValue::Decimal128(
+ Some(*s.$func() as i128),
+ precision,
+ scale,
+ ))
+ }
+ _ => Some(ScalarValue::Int64(Some(*s.$func()))),
+ }
+ }
+ // 96 bit ints not supported
+ ParquetStatistics::Int96(_) => None,
+ ParquetStatistics::Float(s) => Some(ScalarValue::Float32(Some(*s.$func()))),
+ ParquetStatistics::Double(s) => Some(ScalarValue::Float64(Some(*s.$func()))),
+ ParquetStatistics::ByteArray(s) => {
+ // TODO support decimal type for byte array type
+ let s = std::str::from_utf8(s.$bytes_func())
+ .map(|s| s.to_string())
+ .ok();
+ Some(ScalarValue::Utf8(s))
+ }
+ // type not supported yet
+ ParquetStatistics::FixedLenByteArray(s) => {
+ match $target_arrow_type {
+ // just support the decimal data type
+ Some(DataType::Decimal128(precision, scale)) => {
+ Some(ScalarValue::Decimal128(
+ Some(from_bytes_to_i128(s.$bytes_func())),
+ precision,
+ scale,
+ ))
+ }
+ _ => None,
+ }
+ }
+ }
+ }};
+}
+
+// Extract the min or max value calling `func` or `bytes_func` on the ParquetStatistics as appropriate
+macro_rules! get_min_max_values {
+ ($self:expr, $column:expr, $func:ident, $bytes_func:ident) => {{
+ let (_column_index, field) =
+ if let Some((v, f)) = $self.parquet_schema.column_with_name(&$column.name) {
+ (v, f)
+ } else {
+ // Named column was not present
+ return None;
+ };
+
+ let data_type = field.data_type();
+ // The result may be None, because DataFusion doesn't have support for ScalarValues of the column type
+ let null_scalar: ScalarValue = data_type.try_into().ok()?;
+
+ $self.row_group_metadata
+ .columns()
+ .iter()
+ .find(|c| c.column_descr().name() == &$column.name)
+ .and_then(|c| if c.statistics().is_some() {Some((c.statistics().unwrap(), c.column_descr()))} else {None})
+ .map(|(stats, column_descr)|
+ {
+ let target_data_type = parquet_to_arrow_decimal_type(column_descr);
+ get_statistic!(stats, $func, $bytes_func, target_data_type)
+ })
+ .flatten()
+ // column either didn't have statistics at all or didn't have min/max values
+ .or_else(|| Some(null_scalar.clone()))
+ .map(|s| s.to_array())
+ }}
+}
+
+// Extract the null count value on the ParquetStatistics
+macro_rules! get_null_count_values {
+ ($self:expr, $column:expr) => {{
+ let value = ScalarValue::UInt64(
+ if let Some(col) = $self
+ .row_group_metadata
+ .columns()
+ .iter()
+ .find(|c| c.column_descr().name() == &$column.name)
+ {
+ col.statistics().map(|s| s.null_count())
+ } else {
+ Some($self.row_group_metadata.num_rows() as u64)
+ },
+ );
+
+ Some(value.to_array())
+ }};
+}
+
+// Convert parquet column schema to arrow data type, and just consider the
+// decimal data type.
+fn parquet_to_arrow_decimal_type(parquet_column: &ColumnDescriptor) -> Option<DataType> {
+ let type_ptr = parquet_column.self_type_ptr();
+ match type_ptr.get_basic_info().logical_type() {
+ Some(LogicalType::Decimal { scale, precision }) => {
+ Some(DataType::Decimal128(precision as u8, scale as u8))
+ }
+ _ => match type_ptr.get_basic_info().converted_type() {
+ ConvertedType::DECIMAL => Some(DataType::Decimal128(
+ type_ptr.get_precision() as u8,
+ type_ptr.get_scale() as u8,
+ )),
+ _ => None,
+ },
+ }
+}
+
+impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
+ fn min_values(&self, column: &Column) -> Option<ArrayRef> {
+ get_min_max_values!(self, column, min, min_bytes)
+ }
+
+ fn max_values(&self, column: &Column) -> Option<ArrayRef> {
+ get_min_max_values!(self, column, max, max_bytes)
+ }
+
+ fn num_containers(&self) -> usize {
+ 1
+ }
+
+ fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
+ get_null_count_values!(self, column)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
+ use arrow::datatypes::DataType::Decimal128;
+ use arrow::datatypes::Schema;
+ use arrow::datatypes::{DataType, Field};
+ use datafusion_expr::{cast, col, lit};
+ use parquet::basic::LogicalType;
+ use parquet::data_type::{ByteArray, FixedLenByteArray};
+ use parquet::{
+ basic::Type as PhysicalType,
+ file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics},
+ schema::types::SchemaDescPtr,
+ };
+ use std::sync::Arc;
+
+ #[test]
+ fn row_group_pruning_predicate_simple_expr() {
+ use datafusion_expr::{col, lit};
+ // int > 1 => c1_max > 1
+ let expr = col("c1").gt(lit(15));
+ let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
+ let pruning_predicate =
+ PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
+ let schema_descr = get_test_schema_descr(vec![(
+ "c1",
+ PhysicalType::INT32,
+ None,
+ None,
+ None,
+ None,
+ )]);
+ let rgm1 = get_row_group_meta_data(
+ &schema_descr,
+ vec![ParquetStatistics::int32(Some(1), Some(10), None, 0, false)],
+ );
+ let rgm2 = get_row_group_meta_data(
+ &schema_descr,
+ vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)],
+ );
+
+ let metrics = parquet_file_metrics();
+ assert_eq!(
+ prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
+ vec![1]
+ );
+ }
+
+ #[test]
+ fn row_group_pruning_predicate_missing_stats() {
+ use datafusion_expr::{col, lit};
+ // int > 1 => c1_max > 1
+ let expr = col("c1").gt(lit(15));
+ let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
+ let pruning_predicate =
+ PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
+
+ let schema_descr = get_test_schema_descr(vec![(
+ "c1",
+ PhysicalType::INT32,
+ None,
+ None,
+ None,
+ None,
+ )]);
+ let rgm1 = get_row_group_meta_data(
+ &schema_descr,
+ vec![ParquetStatistics::int32(None, None, None, 0, false)],
+ );
+ let rgm2 = get_row_group_meta_data(
+ &schema_descr,
+ vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)],
+ );
+ let metrics = parquet_file_metrics();
+ // missing statistics for first row group mean that the result from the predicate expression
+ // is null / undefined so the first row group can't be filtered out
+ assert_eq!(
+ prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
+ vec![0, 1]
+ );
+ }
+
+ #[test]
+ fn row_group_pruning_predicate_partial_expr() {
+ use datafusion_expr::{col, lit};
+ // test row group predicate with partially supported expression
+ // int > 1 and int % 2 => c1_max > 1 and true
+ let expr = col("c1").gt(lit(15)).and(col("c2").modulus(lit(2)));
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("c1", DataType::Int32, false),
+ Field::new("c2", DataType::Int32, false),
+ ]));
+ let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
+
+ let schema_descr = get_test_schema_descr(vec![
+ ("c1", PhysicalType::INT32, None, None, None, None),
+ ("c2", PhysicalType::INT32, None, None, None, None),
+ ]);
+ let rgm1 = get_row_group_meta_data(
+ &schema_descr,
+ vec![
+ ParquetStatistics::int32(Some(1), Some(10), None, 0, false),
+ ParquetStatistics::int32(Some(1), Some(10), None, 0, false),
+ ],
+ );
+ let rgm2 = get_row_group_meta_data(
+ &schema_descr,
+ vec![
+ ParquetStatistics::int32(Some(11), Some(20), None, 0, false),
+ ParquetStatistics::int32(Some(11), Some(20), None, 0, false),
+ ],
+ );
+
+ let metrics = parquet_file_metrics();
+ let groups = &[rgm1, rgm2];
+ // the first row group is still filtered out because the predicate expression can be partially evaluated
+ // when conditions are joined using AND
+ assert_eq!(
+ prune_row_groups(groups, None, Some(pruning_predicate), &metrics),
+ vec![1]
+ );
+
+ // if conditions in predicate are joined with OR and an unsupported expression is used
+ // this bypasses the entire predicate expression and no row groups are filtered out
+ let expr = col("c1").gt(lit(15)).or(col("c2").modulus(lit(2)));
+ let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
+
+ // if conditions in predicate are joined with OR and an unsupported expression is used
+ // this bypasses the entire predicate expression and no row groups are filtered out
+ assert_eq!(
+ prune_row_groups(groups, None, Some(pruning_predicate), &metrics),
+ vec![0, 1]
+ );
+ }
+
+ fn gen_row_group_meta_data_for_pruning_predicate() -> Vec<RowGroupMetaData> {
+ let schema_descr = get_test_schema_descr(vec![
+ ("c1", PhysicalType::INT32, None, None, None, None),
+ ("c2", PhysicalType::BOOLEAN, None, None, None, None),
+ ]);
+ let rgm1 = get_row_group_meta_data(
+ &schema_descr,
+ vec![
+ ParquetStatistics::int32(Some(1), Some(10), None, 0, false),
+ ParquetStatistics::boolean(Some(false), Some(true), None, 0, false),
+ ],
+ );
+ let rgm2 = get_row_group_meta_data(
+ &schema_descr,
+ vec![
+ ParquetStatistics::int32(Some(11), Some(20), None, 0, false),
+ ParquetStatistics::boolean(Some(false), Some(true), None, 1, false),
+ ],
+ );
+ vec![rgm1, rgm2]
+ }
+
+ #[test]
+ fn row_group_pruning_predicate_null_expr() {
+ use datafusion_expr::{col, lit};
+ // int > 1 and IsNull(bool) => c1_max > 1 and bool_null_count > 0
+ let expr = col("c1").gt(lit(15)).and(col("c2").is_null());
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("c1", DataType::Int32, false),
+ Field::new("c2", DataType::Boolean, false),
+ ]));
+ let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
+ let groups = gen_row_group_meta_data_for_pruning_predicate();
+
+ let metrics = parquet_file_metrics();
+ // First row group was filtered out because it contains no null value on "c2".
+ assert_eq!(
+ prune_row_groups(&groups, None, Some(pruning_predicate), &metrics),
+ vec![1]
+ );
+ }
+
+ #[test]
+ fn row_group_pruning_predicate_eq_null_expr() {
+ use datafusion_expr::{col, lit};
+ // test row group predicate with an unknown (Null) expr
+ //
+ // int > 1 and bool = NULL => c1_max > 1 and null
+ let expr = col("c1")
+ .gt(lit(15))
+ .and(col("c2").eq(lit(ScalarValue::Boolean(None))));
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("c1", DataType::Int32, false),
+ Field::new("c2", DataType::Boolean, false),
+ ]));
+ let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
+ let groups = gen_row_group_meta_data_for_pruning_predicate();
+
+ let metrics = parquet_file_metrics();
+ // bool = NULL always evaluates to NULL (and thus will not
+ // pass predicates. Ideally these should both be false
+ assert_eq!(
+ prune_row_groups(&groups, None, Some(pruning_predicate), &metrics),
+ vec![1]
+ );
+ }
+
+ #[test]
+ fn row_group_pruning_predicate_decimal_type() {
+ // For the decimal data type, parquet can use `INT32`, `INT64`, `BYTE_ARRAY`, `FIXED_LENGTH_BYTE_ARRAY` to
+ // store the data.
+ // In this case, construct four types of statistics to filtered with the decimal predication.
+
+ // INT32: c1 > 5, the c1 is decimal(9,2)
+ // The type of scalar value if decimal(9,2), don't need to do cast
+ let expr = col("c1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2)));
+ let schema =
+ Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 2), false)]);
+ let schema_descr = get_test_schema_descr(vec![(
+ "c1",
+ PhysicalType::INT32,
+ Some(LogicalType::Decimal {
+ scale: 2,
+ precision: 9,
+ }),
+ Some(9),
+ Some(2),
+ None,
+ )]);
+ let pruning_predicate =
+ PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
+ let rgm1 = get_row_group_meta_data(
+ &schema_descr,
+ // [1.00, 6.00]
+ // c1 > 5, this row group will be included in the results.
+ vec![ParquetStatistics::int32(
+ Some(100),
+ Some(600),
+ None,
+ 0,
+ false,
+ )],
+ );
+ let rgm2 = get_row_group_meta_data(
+ &schema_descr,
+ // [0.1, 0.2]
+ // c1 > 5, this row group will not be included in the results.
+ vec![ParquetStatistics::int32(Some(10), Some(20), None, 0, false)],
+ );
+ let metrics = parquet_file_metrics();
+ assert_eq!(
+ prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
+ vec![0]
+ );
+
+ // INT32: c1 > 5, but parquet decimal type has different precision or scale to arrow decimal
+ // The c1 type is decimal(9,0) in the parquet file, and the type of scalar is decimal(5,2).
+ // We should convert all type to the coercion type, which is decimal(11,2)
+ // The decimal of arrow is decimal(5,2), the decimal of parquet is decimal(9,0)
+ let expr = cast(col("c1"), DataType::Decimal128(11, 2)).gt(cast(
+ lit(ScalarValue::Decimal128(Some(500), 5, 2)),
+ Decimal128(11, 2),
+ ));
+ let schema =
+ Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 0), false)]);
+ let schema_descr = get_test_schema_descr(vec![(
+ "c1",
+ PhysicalType::INT32,
+ Some(LogicalType::Decimal {
+ scale: 0,
+ precision: 9,
+ }),
+ Some(9),
+ Some(0),
+ None,
+ )]);
+ let pruning_predicate =
+ PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
+ let rgm1 = get_row_group_meta_data(
+ &schema_descr,
+ // [100, 600]
+ // c1 > 5, this row group will be included in the results.
+ vec![ParquetStatistics::int32(
+ Some(100),
+ Some(600),
+ None,
+ 0,
+ false,
+ )],
+ );
+ let rgm2 = get_row_group_meta_data(
+ &schema_descr,
+ // [10, 20]
+ // c1 > 5, this row group will be included in the results.
+ vec![ParquetStatistics::int32(Some(10), Some(20), None, 0, false)],
+ );
+ let rgm3 = get_row_group_meta_data(
+ &schema_descr,
+ // [0, 2]
+ // c1 > 5, this row group will not be included in the results.
+ vec![ParquetStatistics::int32(Some(0), Some(2), None, 0, false)],
+ );
+ let metrics = parquet_file_metrics();
+ assert_eq!(
+ prune_row_groups(
+ &[rgm1, rgm2, rgm3],
+ None,
+ Some(pruning_predicate),
+ &metrics
+ ),
+ vec![0, 1]
+ );
+
+ // INT64: c1 < 5, the c1 is decimal(18,2)
+ let expr = col("c1").lt(lit(ScalarValue::Decimal128(Some(500), 18, 2)));
+ let schema =
+ Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
+ let schema_descr = get_test_schema_descr(vec![(
+ "c1",
+ PhysicalType::INT64,
+ Some(LogicalType::Decimal {
+ scale: 2,
+ precision: 18,
+ }),
+ Some(18),
+ Some(2),
+ None,
+ )]);
+ let pruning_predicate =
+ PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
+ let rgm1 = get_row_group_meta_data(
+ &schema_descr,
+ // [6.00, 8.00]
+ vec![ParquetStatistics::int32(
+ Some(600),
+ Some(800),
+ None,
+ 0,
+ false,
+ )],
+ );
+ let rgm2 = get_row_group_meta_data(
+ &schema_descr,
+ // [0.1, 0.2]
+ vec![ParquetStatistics::int64(Some(10), Some(20), None, 0, false)],
+ );
+ let metrics = parquet_file_metrics();
+ assert_eq!(
+ prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
+ vec![1]
+ );
+
+ // FIXED_LENGTH_BYTE_ARRAY: c1 = decimal128(100000, 28, 3), the c1 is decimal(18,2)
+ // the type of parquet is decimal(18,2)
+ let schema =
+ Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
+ // cast the type of c1 to decimal(28,3)
+ let left = cast(col("c1"), DataType::Decimal128(28, 3));
+ let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
+ let schema_descr = get_test_schema_descr(vec![(
+ "c1",
+ PhysicalType::FIXED_LEN_BYTE_ARRAY,
+ Some(LogicalType::Decimal {
+ scale: 2,
+ precision: 18,
+ }),
+ Some(18),
+ Some(2),
+ Some(16),
+ )]);
+ let pruning_predicate =
+ PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
+ // we must use the big-endian when encode the i128 to bytes or vec[u8].
+ let rgm1 = get_row_group_meta_data(
+ &schema_descr,
+ vec![ParquetStatistics::fixed_len_byte_array(
+ // 5.00
+ Some(FixedLenByteArray::from(ByteArray::from(
+ 500i128.to_be_bytes().to_vec(),
+ ))),
+ // 80.00
+ Some(FixedLenByteArray::from(ByteArray::from(
+ 8000i128.to_be_bytes().to_vec(),
+ ))),
+ None,
+ 0,
+ false,
+ )],
+ );
+ let rgm2 = get_row_group_meta_data(
+ &schema_descr,
+ vec![ParquetStatistics::fixed_len_byte_array(
+ // 5.00
+ Some(FixedLenByteArray::from(ByteArray::from(
+ 500i128.to_be_bytes().to_vec(),
+ ))),
+ // 200.00
+ Some(FixedLenByteArray::from(ByteArray::from(
+ 20000i128.to_be_bytes().to_vec(),
+ ))),
+ None,
+ 0,
+ false,
+ )],
+ );
+ let metrics = parquet_file_metrics();
+ assert_eq!(
+ prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
+ vec![1]
+ );
+
+ // TODO: BYTE_ARRAY support read decimal from parquet, after the 20.0.0 arrow-rs release
+ }
+
+ fn get_row_group_meta_data(
+ schema_descr: &SchemaDescPtr,
+ column_statistics: Vec<ParquetStatistics>,
+ ) -> RowGroupMetaData {
+ use parquet::file::metadata::ColumnChunkMetaData;
+ let mut columns = vec![];
+ for (i, s) in column_statistics.iter().enumerate() {
+ let column = ColumnChunkMetaData::builder(schema_descr.column(i))
+ .set_statistics(s.clone())
+ .build()
+ .unwrap();
+ columns.push(column);
+ }
+ RowGroupMetaData::builder(schema_descr.clone())
+ .set_num_rows(1000)
+ .set_total_byte_size(2000)
+ .set_column_metadata(columns)
+ .build()
+ .unwrap()
+ }
+
+ #[allow(clippy::type_complexity)]
+ fn get_test_schema_descr(
+ fields: Vec<(
+ &str,
+ PhysicalType,
+ Option<LogicalType>,
+ Option<i32>, // precision
+ Option<i32>, // scale
+ Option<i32>, // length of bytes
+ )>,
+ ) -> SchemaDescPtr {
+ use parquet::schema::types::{SchemaDescriptor, Type as SchemaType};
+ let mut schema_fields = fields
+ .iter()
+ .map(|(n, t, logical, precision, scale, length)| {
+ let mut builder = SchemaType::primitive_type_builder(n, *t);
+ // add logical type for the parquet field
+ match logical {
+ None => {}
+ Some(logical_type) => {
+ builder = builder.with_logical_type(Some(logical_type.clone()));
+ }
+ };
+ match precision {
+ None => {}
+ Some(v) => {
+ builder = builder.with_precision(*v);
+ }
+ };
+ match scale {
+ None => {}
+ Some(v) => {
+ builder = builder.with_scale(*v);
+ }
+ }
+ match length {
+ None => {}
+ Some(v) => {
+ builder = builder.with_length(*v);
+ }
+ }
+ Arc::new(builder.build().unwrap())
+ })
+ .collect::<Vec<_>>();
+ let schema = SchemaType::group_type_builder("schema")
+ .with_fields(&mut schema_fields)
+ .build()
+ .unwrap();
+
+ Arc::new(SchemaDescriptor::new(Arc::new(schema)))
+ }
+
+ fn parquet_file_metrics() -> ParquetFileMetrics {
+ let metrics = Arc::new(ExecutionPlanMetricsSet::new());
+ ParquetFileMetrics::new(0, "file.parquet", &metrics)
+ }
+}