You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/10 12:33:39 UTC

[arrow-datafusion] branch master updated: Minor: Extract parquet row group pruning code into its own module (#4160)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d621c5c0 Minor: Extract parquet row group pruning code into its own module (#4160)
4d621c5c0 is described below

commit 4d621c5c0da387f50c9ab6f3a9ee568062ee9976
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Thu Nov 10 07:33:34 2022 -0500

    Minor: Extract parquet row group pruning code into its own module (#4160)
---
 .../core/src/physical_plan/file_format/parquet.rs  | 729 +-------------------
 .../file_format/parquet/row_groups.rs              | 744 +++++++++++++++++++++
 2 files changed, 755 insertions(+), 718 deletions(-)

diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 36c8392db..cf8763b07 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -17,18 +17,18 @@
 
 //! Execution plan for reading Parquet files
 
+use arrow::datatypes::SchemaRef;
 use fmt::Debug;
+use std::any::Any;
 use std::fmt;
 use std::fs;
 use std::ops::Range;
 use std::sync::Arc;
-use std::{any::Any, convert::TryInto};
 
 use crate::config::OPT_PARQUET_ENABLE_PAGE_INDEX;
 use crate::config::OPT_PARQUET_PUSHDOWN_FILTERS;
 use crate::config::OPT_PARQUET_REORDER_FILTERS;
 use crate::datasource::file_format::parquet::fetch_parquet_metadata;
-use crate::datasource::listing::FileRange;
 use crate::physical_plan::file_format::file_stream::{
     FileOpenFuture, FileOpener, FileStream,
 };
@@ -36,7 +36,7 @@ use crate::physical_plan::file_format::FileMeta;
 use crate::{
     error::{DataFusionError, Result},
     execution::context::{SessionState, TaskContext},
-    physical_optimizer::pruning::{PruningPredicate, PruningStatistics},
+    physical_optimizer::pruning::PruningPredicate,
     physical_plan::{
         expressions::PhysicalSortExpr,
         file_format::{FileScanConfig, SchemaAdapter},
@@ -44,16 +44,9 @@ use crate::{
         DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
         Statistics,
     },
-    scalar::ScalarValue,
-};
-use arrow::datatypes::DataType;
-use arrow::{
-    array::ArrayRef,
-    datatypes::{Schema, SchemaRef},
-    error::ArrowError,
 };
+use arrow::error::ArrowError;
 use bytes::Bytes;
-use datafusion_common::Column;
 use datafusion_expr::Expr;
 use futures::future::BoxFuture;
 use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
@@ -62,18 +55,13 @@ use object_store::{ObjectMeta, ObjectStore};
 use parquet::arrow::arrow_reader::ArrowReaderOptions;
 use parquet::arrow::async_reader::AsyncFileReader;
 use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask};
-use parquet::basic::{ConvertedType, LogicalType};
 use parquet::errors::ParquetError;
-use parquet::file::{
-    metadata::{ParquetMetaData, RowGroupMetaData},
-    properties::WriterProperties,
-    statistics::Statistics as ParquetStatistics,
-};
-use parquet::schema::types::ColumnDescriptor;
+use parquet::file::{metadata::ParquetMetaData, properties::WriterProperties};
 
 mod metrics;
 mod page_filter;
 mod row_filter;
+mod row_groups;
 
 pub use metrics::ParquetFileMetrics;
 
@@ -434,7 +422,7 @@ impl FileOpener for ParquetOpener {
             // Row group pruning: attempt to skip entire row_groups
             // using metadata on the row groups
             let file_metadata = builder.metadata();
-            let row_groups = prune_row_groups(
+            let row_groups = row_groups::prune_row_groups(
                 file_metadata.row_groups(),
                 file_range,
                 pruning_predicate.clone(),
@@ -597,224 +585,6 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
     }
 }
 
-/// Wraps parquet statistics in a way
-/// that implements [`PruningStatistics`]
-struct RowGroupPruningStatistics<'a> {
-    row_group_metadata: &'a RowGroupMetaData,
-    parquet_schema: &'a Schema,
-}
-
-// TODO: consolidate code with arrow-rs
-// Convert the bytes array to i128.
-// The endian of the input bytes array must be big-endian.
-// Copy from the arrow-rs
-fn from_bytes_to_i128(b: &[u8]) -> i128 {
-    assert!(b.len() <= 16, "Decimal128Array supports only up to size 16");
-    let first_bit = b[0] & 128u8 == 128u8;
-    let mut result = if first_bit { [255u8; 16] } else { [0u8; 16] };
-    for (i, v) in b.iter().enumerate() {
-        result[i + (16 - b.len())] = *v;
-    }
-    // The bytes array are from parquet file and must be the big-endian.
-    // The endian is defined by parquet format, and the reference document
-    // https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66
-    i128::from_be_bytes(result)
-}
-
-/// Extract the min/max statistics from a `ParquetStatistics` object
-macro_rules! get_statistic {
-    ($column_statistics:expr, $func:ident, $bytes_func:ident, $target_arrow_type:expr) => {{
-        if !$column_statistics.has_min_max_set() {
-            return None;
-        }
-        match $column_statistics {
-            ParquetStatistics::Boolean(s) => Some(ScalarValue::Boolean(Some(*s.$func()))),
-            ParquetStatistics::Int32(s) => {
-                match $target_arrow_type {
-                    // int32 to decimal with the precision and scale
-                    Some(DataType::Decimal128(precision, scale)) => {
-                        Some(ScalarValue::Decimal128(
-                            Some(*s.$func() as i128),
-                            precision,
-                            scale,
-                        ))
-                    }
-                    _ => Some(ScalarValue::Int32(Some(*s.$func()))),
-                }
-            }
-            ParquetStatistics::Int64(s) => {
-                match $target_arrow_type {
-                    // int64 to decimal with the precision and scale
-                    Some(DataType::Decimal128(precision, scale)) => {
-                        Some(ScalarValue::Decimal128(
-                            Some(*s.$func() as i128),
-                            precision,
-                            scale,
-                        ))
-                    }
-                    _ => Some(ScalarValue::Int64(Some(*s.$func()))),
-                }
-            }
-            // 96 bit ints not supported
-            ParquetStatistics::Int96(_) => None,
-            ParquetStatistics::Float(s) => Some(ScalarValue::Float32(Some(*s.$func()))),
-            ParquetStatistics::Double(s) => Some(ScalarValue::Float64(Some(*s.$func()))),
-            ParquetStatistics::ByteArray(s) => {
-                // TODO support decimal type for byte array type
-                let s = std::str::from_utf8(s.$bytes_func())
-                    .map(|s| s.to_string())
-                    .ok();
-                Some(ScalarValue::Utf8(s))
-            }
-            // type not supported yet
-            ParquetStatistics::FixedLenByteArray(s) => {
-                match $target_arrow_type {
-                    // just support the decimal data type
-                    Some(DataType::Decimal128(precision, scale)) => {
-                        Some(ScalarValue::Decimal128(
-                            Some(from_bytes_to_i128(s.$bytes_func())),
-                            precision,
-                            scale,
-                        ))
-                    }
-                    _ => None,
-                }
-            }
-        }
-    }};
-}
-
-// Extract the min or max value calling `func` or `bytes_func` on the ParquetStatistics as appropriate
-macro_rules! get_min_max_values {
-    ($self:expr, $column:expr, $func:ident, $bytes_func:ident) => {{
-        let (_column_index, field) =
-            if let Some((v, f)) = $self.parquet_schema.column_with_name(&$column.name) {
-                (v, f)
-            } else {
-                // Named column was not present
-                return None;
-            };
-
-        let data_type = field.data_type();
-        // The result may be None, because DataFusion doesn't have support for ScalarValues of the column type
-        let null_scalar: ScalarValue = data_type.try_into().ok()?;
-
-        $self.row_group_metadata
-            .columns()
-            .iter()
-            .find(|c| c.column_descr().name() == &$column.name)
-            .and_then(|c| if c.statistics().is_some() {Some((c.statistics().unwrap(), c.column_descr()))} else {None})
-            .map(|(stats, column_descr)|
-                {
-                    let target_data_type = parquet_to_arrow_decimal_type(column_descr);
-                    get_statistic!(stats, $func, $bytes_func, target_data_type)
-                })
-            .flatten()
-            // column either didn't have statistics at all or didn't have min/max values
-            .or_else(|| Some(null_scalar.clone()))
-            .map(|s| s.to_array())
-    }}
-}
-
-// Extract the null count value on the ParquetStatistics
-macro_rules! get_null_count_values {
-    ($self:expr, $column:expr) => {{
-        let value = ScalarValue::UInt64(
-            if let Some(col) = $self
-                .row_group_metadata
-                .columns()
-                .iter()
-                .find(|c| c.column_descr().name() == &$column.name)
-            {
-                col.statistics().map(|s| s.null_count())
-            } else {
-                Some($self.row_group_metadata.num_rows() as u64)
-            },
-        );
-
-        Some(value.to_array())
-    }};
-}
-
-// Convert parquet column schema to arrow data type, and just consider the
-// decimal data type.
-fn parquet_to_arrow_decimal_type(parquet_column: &ColumnDescriptor) -> Option<DataType> {
-    let type_ptr = parquet_column.self_type_ptr();
-    match type_ptr.get_basic_info().logical_type() {
-        Some(LogicalType::Decimal { scale, precision }) => {
-            Some(DataType::Decimal128(precision as u8, scale as u8))
-        }
-        _ => match type_ptr.get_basic_info().converted_type() {
-            ConvertedType::DECIMAL => Some(DataType::Decimal128(
-                type_ptr.get_precision() as u8,
-                type_ptr.get_scale() as u8,
-            )),
-            _ => None,
-        },
-    }
-}
-
-impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
-    fn min_values(&self, column: &Column) -> Option<ArrayRef> {
-        get_min_max_values!(self, column, min, min_bytes)
-    }
-
-    fn max_values(&self, column: &Column) -> Option<ArrayRef> {
-        get_min_max_values!(self, column, max, max_bytes)
-    }
-
-    fn num_containers(&self) -> usize {
-        1
-    }
-
-    fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
-        get_null_count_values!(self, column)
-    }
-}
-
-fn prune_row_groups(
-    groups: &[RowGroupMetaData],
-    range: Option<FileRange>,
-    predicate: Option<PruningPredicate>,
-    metrics: &ParquetFileMetrics,
-) -> Vec<usize> {
-    // TODO: Columnar pruning
-    let mut filtered = Vec::with_capacity(groups.len());
-    for (idx, metadata) in groups.iter().enumerate() {
-        if let Some(range) = &range {
-            let offset = metadata.column(0).file_offset();
-            if offset < range.start || offset >= range.end {
-                continue;
-            }
-        }
-
-        if let Some(predicate) = &predicate {
-            let pruning_stats = RowGroupPruningStatistics {
-                row_group_metadata: metadata,
-                parquet_schema: predicate.schema().as_ref(),
-            };
-            match predicate.prune(&pruning_stats) {
-                Ok(values) => {
-                    // NB: false means don't scan row group
-                    if !values[0] {
-                        metrics.row_groups_pruned.add(1);
-                        continue;
-                    }
-                }
-                // stats filter array could not be built
-                // return a closure which will not filter out any row groups
-                Err(e) => {
-                    debug!("Error evaluating row group predicate values {}", e);
-                    metrics.predicate_evaluation_errors.add(1);
-                }
-            }
-        }
-
-        filtered.push(idx)
-    }
-    filtered
-}
-
 /// Executes a query and writes the results to a partitioned Parquet file.
 pub async fn plan_to_parquet(
     state: &SessionState,
@@ -881,8 +651,8 @@ mod tests {
         datasource::file_format::{parquet::ParquetFormat, FileFormat},
         physical_plan::collect,
     };
-    use arrow::array::{Float32Array, Int32Array};
-    use arrow::datatypes::DataType::Decimal128;
+    use arrow::array::{ArrayRef, Float32Array, Int32Array};
+    use arrow::datatypes::Schema;
     use arrow::record_batch::RecordBatch;
     use arrow::{
         array::{Int64Array, Int8Array, StringArray},
@@ -890,18 +660,12 @@ mod tests {
     };
     use chrono::{TimeZone, Utc};
     use datafusion_common::assert_contains;
-    use datafusion_expr::{cast, col, lit};
+    use datafusion_common::ScalarValue;
+    use datafusion_expr::{col, lit};
     use futures::StreamExt;
     use object_store::local::LocalFileSystem;
     use object_store::path::Path;
     use object_store::ObjectMeta;
-    use parquet::basic::LogicalType;
-    use parquet::data_type::{ByteArray, FixedLenByteArray};
-    use parquet::{
-        basic::Type as PhysicalType,
-        file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics},
-        schema::types::SchemaDescPtr,
-    };
     use std::fs::File;
     use std::io::Write;
     use tempfile::TempDir;
@@ -1756,477 +1520,6 @@ mod tests {
         }
     }
 
-    fn parquet_file_metrics() -> ParquetFileMetrics {
-        let metrics = Arc::new(ExecutionPlanMetricsSet::new());
-        ParquetFileMetrics::new(0, "file.parquet", &metrics)
-    }
-
-    #[test]
-    fn row_group_pruning_predicate_simple_expr() {
-        use datafusion_expr::{col, lit};
-        // int > 1 => c1_max > 1
-        let expr = col("c1").gt(lit(15));
-        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
-        let pruning_predicate =
-            PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
-        let schema_descr = get_test_schema_descr(vec![(
-            "c1",
-            PhysicalType::INT32,
-            None,
-            None,
-            None,
-            None,
-        )]);
-        let rgm1 = get_row_group_meta_data(
-            &schema_descr,
-            vec![ParquetStatistics::int32(Some(1), Some(10), None, 0, false)],
-        );
-        let rgm2 = get_row_group_meta_data(
-            &schema_descr,
-            vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)],
-        );
-
-        let metrics = parquet_file_metrics();
-        assert_eq!(
-            prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
-            vec![1]
-        );
-    }
-
-    #[test]
-    fn row_group_pruning_predicate_missing_stats() {
-        use datafusion_expr::{col, lit};
-        // int > 1 => c1_max > 1
-        let expr = col("c1").gt(lit(15));
-        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
-        let pruning_predicate =
-            PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
-
-        let schema_descr = get_test_schema_descr(vec![(
-            "c1",
-            PhysicalType::INT32,
-            None,
-            None,
-            None,
-            None,
-        )]);
-        let rgm1 = get_row_group_meta_data(
-            &schema_descr,
-            vec![ParquetStatistics::int32(None, None, None, 0, false)],
-        );
-        let rgm2 = get_row_group_meta_data(
-            &schema_descr,
-            vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)],
-        );
-        let metrics = parquet_file_metrics();
-        // missing statistics for first row group mean that the result from the predicate expression
-        // is null / undefined so the first row group can't be filtered out
-        assert_eq!(
-            prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
-            vec![0, 1]
-        );
-    }
-
-    #[test]
-    fn row_group_pruning_predicate_partial_expr() {
-        use datafusion_expr::{col, lit};
-        // test row group predicate with partially supported expression
-        // int > 1 and int % 2 => c1_max > 1 and true
-        let expr = col("c1").gt(lit(15)).and(col("c2").modulus(lit(2)));
-        let schema = Arc::new(Schema::new(vec![
-            Field::new("c1", DataType::Int32, false),
-            Field::new("c2", DataType::Int32, false),
-        ]));
-        let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
-
-        let schema_descr = get_test_schema_descr(vec![
-            ("c1", PhysicalType::INT32, None, None, None, None),
-            ("c2", PhysicalType::INT32, None, None, None, None),
-        ]);
-        let rgm1 = get_row_group_meta_data(
-            &schema_descr,
-            vec![
-                ParquetStatistics::int32(Some(1), Some(10), None, 0, false),
-                ParquetStatistics::int32(Some(1), Some(10), None, 0, false),
-            ],
-        );
-        let rgm2 = get_row_group_meta_data(
-            &schema_descr,
-            vec![
-                ParquetStatistics::int32(Some(11), Some(20), None, 0, false),
-                ParquetStatistics::int32(Some(11), Some(20), None, 0, false),
-            ],
-        );
-
-        let metrics = parquet_file_metrics();
-        let groups = &[rgm1, rgm2];
-        // the first row group is still filtered out because the predicate expression can be partially evaluated
-        // when conditions are joined using AND
-        assert_eq!(
-            prune_row_groups(groups, None, Some(pruning_predicate), &metrics),
-            vec![1]
-        );
-
-        // if conditions in predicate are joined with OR and an unsupported expression is used
-        // this bypasses the entire predicate expression and no row groups are filtered out
-        let expr = col("c1").gt(lit(15)).or(col("c2").modulus(lit(2)));
-        let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
-
-        // if conditions in predicate are joined with OR and an unsupported expression is used
-        // this bypasses the entire predicate expression and no row groups are filtered out
-        assert_eq!(
-            prune_row_groups(groups, None, Some(pruning_predicate), &metrics),
-            vec![0, 1]
-        );
-    }
-
-    fn gen_row_group_meta_data_for_pruning_predicate() -> Vec<RowGroupMetaData> {
-        let schema_descr = get_test_schema_descr(vec![
-            ("c1", PhysicalType::INT32, None, None, None, None),
-            ("c2", PhysicalType::BOOLEAN, None, None, None, None),
-        ]);
-        let rgm1 = get_row_group_meta_data(
-            &schema_descr,
-            vec![
-                ParquetStatistics::int32(Some(1), Some(10), None, 0, false),
-                ParquetStatistics::boolean(Some(false), Some(true), None, 0, false),
-            ],
-        );
-        let rgm2 = get_row_group_meta_data(
-            &schema_descr,
-            vec![
-                ParquetStatistics::int32(Some(11), Some(20), None, 0, false),
-                ParquetStatistics::boolean(Some(false), Some(true), None, 1, false),
-            ],
-        );
-        vec![rgm1, rgm2]
-    }
-
-    #[test]
-    fn row_group_pruning_predicate_null_expr() {
-        use datafusion_expr::{col, lit};
-        // int > 1 and IsNull(bool) => c1_max > 1 and bool_null_count > 0
-        let expr = col("c1").gt(lit(15)).and(col("c2").is_null());
-        let schema = Arc::new(Schema::new(vec![
-            Field::new("c1", DataType::Int32, false),
-            Field::new("c2", DataType::Boolean, false),
-        ]));
-        let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
-        let groups = gen_row_group_meta_data_for_pruning_predicate();
-
-        let metrics = parquet_file_metrics();
-        // First row group was filtered out because it contains no null value on "c2".
-        assert_eq!(
-            prune_row_groups(&groups, None, Some(pruning_predicate), &metrics),
-            vec![1]
-        );
-    }
-
-    #[test]
-    fn row_group_pruning_predicate_eq_null_expr() {
-        use datafusion_expr::{col, lit};
-        // test row group predicate with an unknown (Null) expr
-        //
-        // int > 1 and bool = NULL => c1_max > 1 and null
-        let expr = col("c1")
-            .gt(lit(15))
-            .and(col("c2").eq(lit(ScalarValue::Boolean(None))));
-        let schema = Arc::new(Schema::new(vec![
-            Field::new("c1", DataType::Int32, false),
-            Field::new("c2", DataType::Boolean, false),
-        ]));
-        let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
-        let groups = gen_row_group_meta_data_for_pruning_predicate();
-
-        let metrics = parquet_file_metrics();
-        // bool = NULL always evaluates to NULL (and thus will not
-        // pass predicates. Ideally these should both be false
-        assert_eq!(
-            prune_row_groups(&groups, None, Some(pruning_predicate), &metrics),
-            vec![1]
-        );
-    }
-
-    #[test]
-    fn row_group_pruning_predicate_decimal_type() {
-        // For the decimal data type, parquet can use `INT32`, `INT64`, `BYTE_ARRAY`, `FIXED_LENGTH_BYTE_ARRAY` to
-        // store the data.
-        // In this case, construct four types of statistics to filtered with the decimal predication.
-
-        // INT32: c1 > 5, the c1 is decimal(9,2)
-        // The type of scalar value if decimal(9,2), don't need to do cast
-        let expr = col("c1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2)));
-        let schema =
-            Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 2), false)]);
-        let schema_descr = get_test_schema_descr(vec![(
-            "c1",
-            PhysicalType::INT32,
-            Some(LogicalType::Decimal {
-                scale: 2,
-                precision: 9,
-            }),
-            Some(9),
-            Some(2),
-            None,
-        )]);
-        let pruning_predicate =
-            PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
-        let rgm1 = get_row_group_meta_data(
-            &schema_descr,
-            // [1.00, 6.00]
-            // c1 > 5, this row group will be included in the results.
-            vec![ParquetStatistics::int32(
-                Some(100),
-                Some(600),
-                None,
-                0,
-                false,
-            )],
-        );
-        let rgm2 = get_row_group_meta_data(
-            &schema_descr,
-            // [0.1, 0.2]
-            // c1 > 5, this row group will not be included in the results.
-            vec![ParquetStatistics::int32(Some(10), Some(20), None, 0, false)],
-        );
-        let metrics = parquet_file_metrics();
-        assert_eq!(
-            prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
-            vec![0]
-        );
-
-        // INT32: c1 > 5, but parquet decimal type has different precision or scale to arrow decimal
-        // The c1 type is decimal(9,0) in the parquet file, and the type of scalar is decimal(5,2).
-        // We should convert all type to the coercion type, which is decimal(11,2)
-        // The decimal of arrow is decimal(5,2), the decimal of parquet is decimal(9,0)
-        let expr = cast(col("c1"), DataType::Decimal128(11, 2)).gt(cast(
-            lit(ScalarValue::Decimal128(Some(500), 5, 2)),
-            Decimal128(11, 2),
-        ));
-        let schema =
-            Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 0), false)]);
-        let schema_descr = get_test_schema_descr(vec![(
-            "c1",
-            PhysicalType::INT32,
-            Some(LogicalType::Decimal {
-                scale: 0,
-                precision: 9,
-            }),
-            Some(9),
-            Some(0),
-            None,
-        )]);
-        let pruning_predicate =
-            PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
-        let rgm1 = get_row_group_meta_data(
-            &schema_descr,
-            // [100, 600]
-            // c1 > 5, this row group will be included in the results.
-            vec![ParquetStatistics::int32(
-                Some(100),
-                Some(600),
-                None,
-                0,
-                false,
-            )],
-        );
-        let rgm2 = get_row_group_meta_data(
-            &schema_descr,
-            // [10, 20]
-            // c1 > 5, this row group will be included in the results.
-            vec![ParquetStatistics::int32(Some(10), Some(20), None, 0, false)],
-        );
-        let rgm3 = get_row_group_meta_data(
-            &schema_descr,
-            // [0, 2]
-            // c1 > 5, this row group will not be included in the results.
-            vec![ParquetStatistics::int32(Some(0), Some(2), None, 0, false)],
-        );
-        let metrics = parquet_file_metrics();
-        assert_eq!(
-            prune_row_groups(
-                &[rgm1, rgm2, rgm3],
-                None,
-                Some(pruning_predicate),
-                &metrics
-            ),
-            vec![0, 1]
-        );
-
-        // INT64: c1 < 5, the c1 is decimal(18,2)
-        let expr = col("c1").lt(lit(ScalarValue::Decimal128(Some(500), 18, 2)));
-        let schema =
-            Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
-        let schema_descr = get_test_schema_descr(vec![(
-            "c1",
-            PhysicalType::INT64,
-            Some(LogicalType::Decimal {
-                scale: 2,
-                precision: 18,
-            }),
-            Some(18),
-            Some(2),
-            None,
-        )]);
-        let pruning_predicate =
-            PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
-        let rgm1 = get_row_group_meta_data(
-            &schema_descr,
-            // [6.00, 8.00]
-            vec![ParquetStatistics::int32(
-                Some(600),
-                Some(800),
-                None,
-                0,
-                false,
-            )],
-        );
-        let rgm2 = get_row_group_meta_data(
-            &schema_descr,
-            // [0.1, 0.2]
-            vec![ParquetStatistics::int64(Some(10), Some(20), None, 0, false)],
-        );
-        let metrics = parquet_file_metrics();
-        assert_eq!(
-            prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
-            vec![1]
-        );
-
-        // FIXED_LENGTH_BYTE_ARRAY: c1 = decimal128(100000, 28, 3), the c1 is decimal(18,2)
-        // the type of parquet is decimal(18,2)
-        let schema =
-            Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
-        // cast the type of c1 to decimal(28,3)
-        let left = cast(col("c1"), DataType::Decimal128(28, 3));
-        let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
-        let schema_descr = get_test_schema_descr(vec![(
-            "c1",
-            PhysicalType::FIXED_LEN_BYTE_ARRAY,
-            Some(LogicalType::Decimal {
-                scale: 2,
-                precision: 18,
-            }),
-            Some(18),
-            Some(2),
-            Some(16),
-        )]);
-        let pruning_predicate =
-            PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
-        // we must use the big-endian when encode the i128 to bytes or vec[u8].
-        let rgm1 = get_row_group_meta_data(
-            &schema_descr,
-            vec![ParquetStatistics::fixed_len_byte_array(
-                // 5.00
-                Some(FixedLenByteArray::from(ByteArray::from(
-                    500i128.to_be_bytes().to_vec(),
-                ))),
-                // 80.00
-                Some(FixedLenByteArray::from(ByteArray::from(
-                    8000i128.to_be_bytes().to_vec(),
-                ))),
-                None,
-                0,
-                false,
-            )],
-        );
-        let rgm2 = get_row_group_meta_data(
-            &schema_descr,
-            vec![ParquetStatistics::fixed_len_byte_array(
-                // 5.00
-                Some(FixedLenByteArray::from(ByteArray::from(
-                    500i128.to_be_bytes().to_vec(),
-                ))),
-                // 200.00
-                Some(FixedLenByteArray::from(ByteArray::from(
-                    20000i128.to_be_bytes().to_vec(),
-                ))),
-                None,
-                0,
-                false,
-            )],
-        );
-        let metrics = parquet_file_metrics();
-        assert_eq!(
-            prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
-            vec![1]
-        );
-
-        // TODO: BYTE_ARRAY support read decimal from parquet, after the 20.0.0 arrow-rs release
-    }
-
-    fn get_row_group_meta_data(
-        schema_descr: &SchemaDescPtr,
-        column_statistics: Vec<ParquetStatistics>,
-    ) -> RowGroupMetaData {
-        use parquet::file::metadata::ColumnChunkMetaData;
-        let mut columns = vec![];
-        for (i, s) in column_statistics.iter().enumerate() {
-            let column = ColumnChunkMetaData::builder(schema_descr.column(i))
-                .set_statistics(s.clone())
-                .build()
-                .unwrap();
-            columns.push(column);
-        }
-        RowGroupMetaData::builder(schema_descr.clone())
-            .set_num_rows(1000)
-            .set_total_byte_size(2000)
-            .set_column_metadata(columns)
-            .build()
-            .unwrap()
-    }
-
-    #[allow(clippy::type_complexity)]
-    fn get_test_schema_descr(
-        fields: Vec<(
-            &str,
-            PhysicalType,
-            Option<LogicalType>,
-            Option<i32>, // precision
-            Option<i32>, // scale
-            Option<i32>, // length of bytes
-        )>,
-    ) -> SchemaDescPtr {
-        use parquet::schema::types::{SchemaDescriptor, Type as SchemaType};
-        let mut schema_fields = fields
-            .iter()
-            .map(|(n, t, logical, precision, scale, length)| {
-                let mut builder = SchemaType::primitive_type_builder(n, *t);
-                // add logical type for the parquet field
-                match logical {
-                    None => {}
-                    Some(logical_type) => {
-                        builder = builder.with_logical_type(Some(logical_type.clone()));
-                    }
-                };
-                match precision {
-                    None => {}
-                    Some(v) => {
-                        builder = builder.with_precision(*v);
-                    }
-                };
-                match scale {
-                    None => {}
-                    Some(v) => {
-                        builder = builder.with_scale(*v);
-                    }
-                }
-                match length {
-                    None => {}
-                    Some(v) => {
-                        builder = builder.with_length(*v);
-                    }
-                }
-                Arc::new(builder.build().unwrap())
-            })
-            .collect::<Vec<_>>();
-        let schema = SchemaType::group_type_builder("schema")
-            .with_fields(&mut schema_fields)
-            .build()
-            .unwrap();
-
-        Arc::new(SchemaDescriptor::new(Arc::new(schema)))
-    }
-
     fn populate_csv_partitions(
         tmp_dir: &TempDir,
         partition_count: usize,
diff --git a/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs b/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs
new file mode 100644
index 000000000..d7cbb1984
--- /dev/null
+++ b/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs
@@ -0,0 +1,744 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::{
+    array::ArrayRef,
+    datatypes::{DataType, Schema},
+};
+use datafusion_common::Column;
+use datafusion_common::ScalarValue;
+use log::debug;
+
+use parquet::{
+    file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics},
+    schema::types::ColumnDescriptor,
+};
+
+use crate::{
+    datasource::listing::FileRange,
+    physical_optimizer::pruning::{PruningPredicate, PruningStatistics},
+};
+use parquet::basic::{ConvertedType, LogicalType};
+
+use super::ParquetFileMetrics;
+
+pub(crate) fn prune_row_groups(
+    groups: &[RowGroupMetaData],
+    range: Option<FileRange>,
+    predicate: Option<PruningPredicate>,
+    metrics: &ParquetFileMetrics,
+) -> Vec<usize> {
+    // TODO: Columnar pruning
+    let mut filtered = Vec::with_capacity(groups.len());
+    for (idx, metadata) in groups.iter().enumerate() {
+        if let Some(range) = &range {
+            let offset = metadata.column(0).file_offset();
+            if offset < range.start || offset >= range.end {
+                continue;
+            }
+        }
+
+        if let Some(predicate) = &predicate {
+            let pruning_stats = RowGroupPruningStatistics {
+                row_group_metadata: metadata,
+                parquet_schema: predicate.schema().as_ref(),
+            };
+            match predicate.prune(&pruning_stats) {
+                Ok(values) => {
+                    // NB: false means don't scan row group
+                    if !values[0] {
+                        metrics.row_groups_pruned.add(1);
+                        continue;
+                    }
+                }
+                // stats filter array could not be built
+                // return a closure which will not filter out any row groups
+                Err(e) => {
+                    debug!("Error evaluating row group predicate values {}", e);
+                    metrics.predicate_evaluation_errors.add(1);
+                }
+            }
+        }
+
+        filtered.push(idx)
+    }
+    filtered
+}
+
+/// Wraps parquet statistics in a way
+/// that implements [`PruningStatistics`]
+struct RowGroupPruningStatistics<'a> {
+    row_group_metadata: &'a RowGroupMetaData,
+    parquet_schema: &'a Schema,
+}
+
+// TODO: consolidate code with arrow-rs
+// Convert the bytes array to i128.
+// The endian of the input bytes array must be big-endian.
+// Copy from the arrow-rs
+fn from_bytes_to_i128(b: &[u8]) -> i128 {
+    assert!(b.len() <= 16, "Decimal128Array supports only up to size 16");
+    let first_bit = b[0] & 128u8 == 128u8;
+    let mut result = if first_bit { [255u8; 16] } else { [0u8; 16] };
+    for (i, v) in b.iter().enumerate() {
+        result[i + (16 - b.len())] = *v;
+    }
+    // The bytes array are from parquet file and must be the big-endian.
+    // The endian is defined by parquet format, and the reference document
+    // https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66
+    i128::from_be_bytes(result)
+}
+
+/// Extract the min/max statistics from a `ParquetStatistics` object
+macro_rules! get_statistic {
+    ($column_statistics:expr, $func:ident, $bytes_func:ident, $target_arrow_type:expr) => {{
+        if !$column_statistics.has_min_max_set() {
+            return None;
+        }
+        match $column_statistics {
+            ParquetStatistics::Boolean(s) => Some(ScalarValue::Boolean(Some(*s.$func()))),
+            ParquetStatistics::Int32(s) => {
+                match $target_arrow_type {
+                    // int32 to decimal with the precision and scale
+                    Some(DataType::Decimal128(precision, scale)) => {
+                        Some(ScalarValue::Decimal128(
+                            Some(*s.$func() as i128),
+                            precision,
+                            scale,
+                        ))
+                    }
+                    _ => Some(ScalarValue::Int32(Some(*s.$func()))),
+                }
+            }
+            ParquetStatistics::Int64(s) => {
+                match $target_arrow_type {
+                    // int64 to decimal with the precision and scale
+                    Some(DataType::Decimal128(precision, scale)) => {
+                        Some(ScalarValue::Decimal128(
+                            Some(*s.$func() as i128),
+                            precision,
+                            scale,
+                        ))
+                    }
+                    _ => Some(ScalarValue::Int64(Some(*s.$func()))),
+                }
+            }
+            // 96 bit ints not supported
+            ParquetStatistics::Int96(_) => None,
+            ParquetStatistics::Float(s) => Some(ScalarValue::Float32(Some(*s.$func()))),
+            ParquetStatistics::Double(s) => Some(ScalarValue::Float64(Some(*s.$func()))),
+            ParquetStatistics::ByteArray(s) => {
+                // TODO support decimal type for byte array type
+                let s = std::str::from_utf8(s.$bytes_func())
+                    .map(|s| s.to_string())
+                    .ok();
+                Some(ScalarValue::Utf8(s))
+            }
+            // type not supported yet
+            ParquetStatistics::FixedLenByteArray(s) => {
+                match $target_arrow_type {
+                    // just support the decimal data type
+                    Some(DataType::Decimal128(precision, scale)) => {
+                        Some(ScalarValue::Decimal128(
+                            Some(from_bytes_to_i128(s.$bytes_func())),
+                            precision,
+                            scale,
+                        ))
+                    }
+                    _ => None,
+                }
+            }
+        }
+    }};
+}
+
+// Extract the min or max value calling `func` or `bytes_func` on the ParquetStatistics as appropriate
+macro_rules! get_min_max_values {
+    ($self:expr, $column:expr, $func:ident, $bytes_func:ident) => {{
+        let (_column_index, field) =
+            if let Some((v, f)) = $self.parquet_schema.column_with_name(&$column.name) {
+                (v, f)
+            } else {
+                // Named column was not present
+                return None;
+            };
+
+        let data_type = field.data_type();
+        // The result may be None, because DataFusion doesn't have support for ScalarValues of the column type
+        let null_scalar: ScalarValue = data_type.try_into().ok()?;
+
+        $self.row_group_metadata
+            .columns()
+            .iter()
+            .find(|c| c.column_descr().name() == &$column.name)
+            .and_then(|c| if c.statistics().is_some() {Some((c.statistics().unwrap(), c.column_descr()))} else {None})
+            .map(|(stats, column_descr)|
+                {
+                    let target_data_type = parquet_to_arrow_decimal_type(column_descr);
+                    get_statistic!(stats, $func, $bytes_func, target_data_type)
+                })
+            .flatten()
+            // column either didn't have statistics at all or didn't have min/max values
+            .or_else(|| Some(null_scalar.clone()))
+            .map(|s| s.to_array())
+    }}
+}
+
+// Extract the null count value on the ParquetStatistics
+macro_rules! get_null_count_values {
+    ($self:expr, $column:expr) => {{
+        let value = ScalarValue::UInt64(
+            if let Some(col) = $self
+                .row_group_metadata
+                .columns()
+                .iter()
+                .find(|c| c.column_descr().name() == &$column.name)
+            {
+                col.statistics().map(|s| s.null_count())
+            } else {
+                Some($self.row_group_metadata.num_rows() as u64)
+            },
+        );
+
+        Some(value.to_array())
+    }};
+}
+
+// Convert parquet column schema to arrow data type, and just consider the
+// decimal data type.
+fn parquet_to_arrow_decimal_type(parquet_column: &ColumnDescriptor) -> Option<DataType> {
+    let type_ptr = parquet_column.self_type_ptr();
+    match type_ptr.get_basic_info().logical_type() {
+        Some(LogicalType::Decimal { scale, precision }) => {
+            Some(DataType::Decimal128(precision as u8, scale as u8))
+        }
+        _ => match type_ptr.get_basic_info().converted_type() {
+            ConvertedType::DECIMAL => Some(DataType::Decimal128(
+                type_ptr.get_precision() as u8,
+                type_ptr.get_scale() as u8,
+            )),
+            _ => None,
+        },
+    }
+}
+
+impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
+    fn min_values(&self, column: &Column) -> Option<ArrayRef> {
+        get_min_max_values!(self, column, min, min_bytes)
+    }
+
+    fn max_values(&self, column: &Column) -> Option<ArrayRef> {
+        get_min_max_values!(self, column, max, max_bytes)
+    }
+
+    fn num_containers(&self) -> usize {
+        1
+    }
+
+    fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
+        get_null_count_values!(self, column)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
+    use arrow::datatypes::DataType::Decimal128;
+    use arrow::datatypes::Schema;
+    use arrow::datatypes::{DataType, Field};
+    use datafusion_expr::{cast, col, lit};
+    use parquet::basic::LogicalType;
+    use parquet::data_type::{ByteArray, FixedLenByteArray};
+    use parquet::{
+        basic::Type as PhysicalType,
+        file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics},
+        schema::types::SchemaDescPtr,
+    };
+    use std::sync::Arc;
+
+    #[test]
+    fn row_group_pruning_predicate_simple_expr() {
+        use datafusion_expr::{col, lit};
+        // int > 1 => c1_max > 1
+        let expr = col("c1").gt(lit(15));
+        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
+        let pruning_predicate =
+            PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
+        let schema_descr = get_test_schema_descr(vec![(
+            "c1",
+            PhysicalType::INT32,
+            None,
+            None,
+            None,
+            None,
+        )]);
+        let rgm1 = get_row_group_meta_data(
+            &schema_descr,
+            vec![ParquetStatistics::int32(Some(1), Some(10), None, 0, false)],
+        );
+        let rgm2 = get_row_group_meta_data(
+            &schema_descr,
+            vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)],
+        );
+
+        let metrics = parquet_file_metrics();
+        assert_eq!(
+            prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
+            vec![1]
+        );
+    }
+
+    #[test]
+    fn row_group_pruning_predicate_missing_stats() {
+        use datafusion_expr::{col, lit};
+        // int > 1 => c1_max > 1
+        let expr = col("c1").gt(lit(15));
+        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
+        let pruning_predicate =
+            PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
+
+        let schema_descr = get_test_schema_descr(vec![(
+            "c1",
+            PhysicalType::INT32,
+            None,
+            None,
+            None,
+            None,
+        )]);
+        let rgm1 = get_row_group_meta_data(
+            &schema_descr,
+            vec![ParquetStatistics::int32(None, None, None, 0, false)],
+        );
+        let rgm2 = get_row_group_meta_data(
+            &schema_descr,
+            vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)],
+        );
+        let metrics = parquet_file_metrics();
+        // missing statistics for first row group mean that the result from the predicate expression
+        // is null / undefined so the first row group can't be filtered out
+        assert_eq!(
+            prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
+            vec![0, 1]
+        );
+    }
+
+    #[test]
+    fn row_group_pruning_predicate_partial_expr() {
+        use datafusion_expr::{col, lit};
+        // test row group predicate with partially supported expression
+        // int > 1 and int % 2 => c1_max > 1 and true
+        let expr = col("c1").gt(lit(15)).and(col("c2").modulus(lit(2)));
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("c1", DataType::Int32, false),
+            Field::new("c2", DataType::Int32, false),
+        ]));
+        let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
+
+        let schema_descr = get_test_schema_descr(vec![
+            ("c1", PhysicalType::INT32, None, None, None, None),
+            ("c2", PhysicalType::INT32, None, None, None, None),
+        ]);
+        let rgm1 = get_row_group_meta_data(
+            &schema_descr,
+            vec![
+                ParquetStatistics::int32(Some(1), Some(10), None, 0, false),
+                ParquetStatistics::int32(Some(1), Some(10), None, 0, false),
+            ],
+        );
+        let rgm2 = get_row_group_meta_data(
+            &schema_descr,
+            vec![
+                ParquetStatistics::int32(Some(11), Some(20), None, 0, false),
+                ParquetStatistics::int32(Some(11), Some(20), None, 0, false),
+            ],
+        );
+
+        let metrics = parquet_file_metrics();
+        let groups = &[rgm1, rgm2];
+        // the first row group is still filtered out because the predicate expression can be partially evaluated
+        // when conditions are joined using AND
+        assert_eq!(
+            prune_row_groups(groups, None, Some(pruning_predicate), &metrics),
+            vec![1]
+        );
+
+        // if conditions in predicate are joined with OR and an unsupported expression is used
+        // this bypasses the entire predicate expression and no row groups are filtered out
+        let expr = col("c1").gt(lit(15)).or(col("c2").modulus(lit(2)));
+        let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
+
+        // if conditions in predicate are joined with OR and an unsupported expression is used
+        // this bypasses the entire predicate expression and no row groups are filtered out
+        assert_eq!(
+            prune_row_groups(groups, None, Some(pruning_predicate), &metrics),
+            vec![0, 1]
+        );
+    }
+
+    fn gen_row_group_meta_data_for_pruning_predicate() -> Vec<RowGroupMetaData> {
+        let schema_descr = get_test_schema_descr(vec![
+            ("c1", PhysicalType::INT32, None, None, None, None),
+            ("c2", PhysicalType::BOOLEAN, None, None, None, None),
+        ]);
+        let rgm1 = get_row_group_meta_data(
+            &schema_descr,
+            vec![
+                ParquetStatistics::int32(Some(1), Some(10), None, 0, false),
+                ParquetStatistics::boolean(Some(false), Some(true), None, 0, false),
+            ],
+        );
+        let rgm2 = get_row_group_meta_data(
+            &schema_descr,
+            vec![
+                ParquetStatistics::int32(Some(11), Some(20), None, 0, false),
+                ParquetStatistics::boolean(Some(false), Some(true), None, 1, false),
+            ],
+        );
+        vec![rgm1, rgm2]
+    }
+
+    #[test]
+    fn row_group_pruning_predicate_null_expr() {
+        use datafusion_expr::{col, lit};
+        // int > 1 and IsNull(bool) => c1_max > 1 and bool_null_count > 0
+        let expr = col("c1").gt(lit(15)).and(col("c2").is_null());
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("c1", DataType::Int32, false),
+            Field::new("c2", DataType::Boolean, false),
+        ]));
+        let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
+        let groups = gen_row_group_meta_data_for_pruning_predicate();
+
+        let metrics = parquet_file_metrics();
+        // First row group was filtered out because it contains no null value on "c2".
+        assert_eq!(
+            prune_row_groups(&groups, None, Some(pruning_predicate), &metrics),
+            vec![1]
+        );
+    }
+
+    #[test]
+    fn row_group_pruning_predicate_eq_null_expr() {
+        use datafusion_expr::{col, lit};
+        // test row group predicate with an unknown (Null) expr
+        //
+        // int > 1 and bool = NULL => c1_max > 1 and null
+        let expr = col("c1")
+            .gt(lit(15))
+            .and(col("c2").eq(lit(ScalarValue::Boolean(None))));
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("c1", DataType::Int32, false),
+            Field::new("c2", DataType::Boolean, false),
+        ]));
+        let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
+        let groups = gen_row_group_meta_data_for_pruning_predicate();
+
+        let metrics = parquet_file_metrics();
+        // bool = NULL always evaluates to NULL (and thus will not
+        // pass predicates. Ideally these should both be false
+        assert_eq!(
+            prune_row_groups(&groups, None, Some(pruning_predicate), &metrics),
+            vec![1]
+        );
+    }
+
+    #[test]
+    fn row_group_pruning_predicate_decimal_type() {
+        // For the decimal data type, parquet can use `INT32`, `INT64`, `BYTE_ARRAY`, `FIXED_LENGTH_BYTE_ARRAY` to
+        // store the data.
+        // In this case, construct four types of statistics to filtered with the decimal predication.
+
+        // INT32: c1 > 5, the c1 is decimal(9,2)
+        // The type of scalar value if decimal(9,2), don't need to do cast
+        let expr = col("c1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2)));
+        let schema =
+            Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 2), false)]);
+        let schema_descr = get_test_schema_descr(vec![(
+            "c1",
+            PhysicalType::INT32,
+            Some(LogicalType::Decimal {
+                scale: 2,
+                precision: 9,
+            }),
+            Some(9),
+            Some(2),
+            None,
+        )]);
+        let pruning_predicate =
+            PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
+        let rgm1 = get_row_group_meta_data(
+            &schema_descr,
+            // [1.00, 6.00]
+            // c1 > 5, this row group will be included in the results.
+            vec![ParquetStatistics::int32(
+                Some(100),
+                Some(600),
+                None,
+                0,
+                false,
+            )],
+        );
+        let rgm2 = get_row_group_meta_data(
+            &schema_descr,
+            // [0.1, 0.2]
+            // c1 > 5, this row group will not be included in the results.
+            vec![ParquetStatistics::int32(Some(10), Some(20), None, 0, false)],
+        );
+        let metrics = parquet_file_metrics();
+        assert_eq!(
+            prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
+            vec![0]
+        );
+
+        // INT32: c1 > 5, but parquet decimal type has different precision or scale to arrow decimal
+        // The c1 type is decimal(9,0) in the parquet file, and the type of scalar is decimal(5,2).
+        // We should convert all type to the coercion type, which is decimal(11,2)
+        // The decimal of arrow is decimal(5,2), the decimal of parquet is decimal(9,0)
+        let expr = cast(col("c1"), DataType::Decimal128(11, 2)).gt(cast(
+            lit(ScalarValue::Decimal128(Some(500), 5, 2)),
+            Decimal128(11, 2),
+        ));
+        let schema =
+            Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 0), false)]);
+        let schema_descr = get_test_schema_descr(vec![(
+            "c1",
+            PhysicalType::INT32,
+            Some(LogicalType::Decimal {
+                scale: 0,
+                precision: 9,
+            }),
+            Some(9),
+            Some(0),
+            None,
+        )]);
+        let pruning_predicate =
+            PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
+        let rgm1 = get_row_group_meta_data(
+            &schema_descr,
+            // [100, 600]
+            // c1 > 5, this row group will be included in the results.
+            vec![ParquetStatistics::int32(
+                Some(100),
+                Some(600),
+                None,
+                0,
+                false,
+            )],
+        );
+        let rgm2 = get_row_group_meta_data(
+            &schema_descr,
+            // [10, 20]
+            // c1 > 5, this row group will be included in the results.
+            vec![ParquetStatistics::int32(Some(10), Some(20), None, 0, false)],
+        );
+        let rgm3 = get_row_group_meta_data(
+            &schema_descr,
+            // [0, 2]
+            // c1 > 5, this row group will not be included in the results.
+            vec![ParquetStatistics::int32(Some(0), Some(2), None, 0, false)],
+        );
+        let metrics = parquet_file_metrics();
+        assert_eq!(
+            prune_row_groups(
+                &[rgm1, rgm2, rgm3],
+                None,
+                Some(pruning_predicate),
+                &metrics
+            ),
+            vec![0, 1]
+        );
+
+        // INT64: c1 < 5, the c1 is decimal(18,2)
+        let expr = col("c1").lt(lit(ScalarValue::Decimal128(Some(500), 18, 2)));
+        let schema =
+            Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
+        let schema_descr = get_test_schema_descr(vec![(
+            "c1",
+            PhysicalType::INT64,
+            Some(LogicalType::Decimal {
+                scale: 2,
+                precision: 18,
+            }),
+            Some(18),
+            Some(2),
+            None,
+        )]);
+        let pruning_predicate =
+            PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
+        let rgm1 = get_row_group_meta_data(
+            &schema_descr,
+            // [6.00, 8.00]
+            vec![ParquetStatistics::int32(
+                Some(600),
+                Some(800),
+                None,
+                0,
+                false,
+            )],
+        );
+        let rgm2 = get_row_group_meta_data(
+            &schema_descr,
+            // [0.1, 0.2]
+            vec![ParquetStatistics::int64(Some(10), Some(20), None, 0, false)],
+        );
+        let metrics = parquet_file_metrics();
+        assert_eq!(
+            prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
+            vec![1]
+        );
+
+        // FIXED_LENGTH_BYTE_ARRAY: c1 = decimal128(100000, 28, 3), the c1 is decimal(18,2)
+        // the type of parquet is decimal(18,2)
+        let schema =
+            Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
+        // cast the type of c1 to decimal(28,3)
+        let left = cast(col("c1"), DataType::Decimal128(28, 3));
+        let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
+        let schema_descr = get_test_schema_descr(vec![(
+            "c1",
+            PhysicalType::FIXED_LEN_BYTE_ARRAY,
+            Some(LogicalType::Decimal {
+                scale: 2,
+                precision: 18,
+            }),
+            Some(18),
+            Some(2),
+            Some(16),
+        )]);
+        let pruning_predicate =
+            PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
+        // we must use the big-endian when encode the i128 to bytes or vec[u8].
+        let rgm1 = get_row_group_meta_data(
+            &schema_descr,
+            vec![ParquetStatistics::fixed_len_byte_array(
+                // 5.00
+                Some(FixedLenByteArray::from(ByteArray::from(
+                    500i128.to_be_bytes().to_vec(),
+                ))),
+                // 80.00
+                Some(FixedLenByteArray::from(ByteArray::from(
+                    8000i128.to_be_bytes().to_vec(),
+                ))),
+                None,
+                0,
+                false,
+            )],
+        );
+        let rgm2 = get_row_group_meta_data(
+            &schema_descr,
+            vec![ParquetStatistics::fixed_len_byte_array(
+                // 5.00
+                Some(FixedLenByteArray::from(ByteArray::from(
+                    500i128.to_be_bytes().to_vec(),
+                ))),
+                // 200.00
+                Some(FixedLenByteArray::from(ByteArray::from(
+                    20000i128.to_be_bytes().to_vec(),
+                ))),
+                None,
+                0,
+                false,
+            )],
+        );
+        let metrics = parquet_file_metrics();
+        assert_eq!(
+            prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
+            vec![1]
+        );
+
+        // TODO: BYTE_ARRAY support read decimal from parquet, after the 20.0.0 arrow-rs release
+    }
+
+    fn get_row_group_meta_data(
+        schema_descr: &SchemaDescPtr,
+        column_statistics: Vec<ParquetStatistics>,
+    ) -> RowGroupMetaData {
+        use parquet::file::metadata::ColumnChunkMetaData;
+        let mut columns = vec![];
+        for (i, s) in column_statistics.iter().enumerate() {
+            let column = ColumnChunkMetaData::builder(schema_descr.column(i))
+                .set_statistics(s.clone())
+                .build()
+                .unwrap();
+            columns.push(column);
+        }
+        RowGroupMetaData::builder(schema_descr.clone())
+            .set_num_rows(1000)
+            .set_total_byte_size(2000)
+            .set_column_metadata(columns)
+            .build()
+            .unwrap()
+    }
+
+    #[allow(clippy::type_complexity)]
+    fn get_test_schema_descr(
+        fields: Vec<(
+            &str,
+            PhysicalType,
+            Option<LogicalType>,
+            Option<i32>, // precision
+            Option<i32>, // scale
+            Option<i32>, // length of bytes
+        )>,
+    ) -> SchemaDescPtr {
+        use parquet::schema::types::{SchemaDescriptor, Type as SchemaType};
+        let mut schema_fields = fields
+            .iter()
+            .map(|(n, t, logical, precision, scale, length)| {
+                let mut builder = SchemaType::primitive_type_builder(n, *t);
+                // add logical type for the parquet field
+                match logical {
+                    None => {}
+                    Some(logical_type) => {
+                        builder = builder.with_logical_type(Some(logical_type.clone()));
+                    }
+                };
+                match precision {
+                    None => {}
+                    Some(v) => {
+                        builder = builder.with_precision(*v);
+                    }
+                };
+                match scale {
+                    None => {}
+                    Some(v) => {
+                        builder = builder.with_scale(*v);
+                    }
+                }
+                match length {
+                    None => {}
+                    Some(v) => {
+                        builder = builder.with_length(*v);
+                    }
+                }
+                Arc::new(builder.build().unwrap())
+            })
+            .collect::<Vec<_>>();
+        let schema = SchemaType::group_type_builder("schema")
+            .with_fields(&mut schema_fields)
+            .build()
+            .unwrap();
+
+        Arc::new(SchemaDescriptor::new(Arc::new(schema)))
+    }
+
+    fn parquet_file_metrics() -> ParquetFileMetrics {
+        let metrics = Arc::new(ExecutionPlanMetricsSet::new());
+        ParquetFileMetrics::new(0, "file.parquet", &metrics)
+    }
+}