You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/22 12:07:19 UTC

[arrow-datafusion] branch master updated: Support parquet page filtering on min_max for `decimal128` and `string` columns (#4255)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new d7a7fb61a Support parquet page filtering on min_max for `decimal128` and `string` columns  (#4255)
d7a7fb61a is described below

commit d7a7fb61afe9ce2824aae737f65aec12d9513f7f
Author: Yang Jiang <ya...@ebay.com>
AuthorDate: Tue Nov 22 20:07:13 2022 +0800

    Support parquet page filtering on min_max for `decimal128` and `string` columns  (#4255)
    
    * Support parquet page filtering for string columns
    
    Signed-off-by: yangjiang <ya...@ebay.com>
    
    * Support parquet page filtering on min_max for decimal128 columns
    
    Signed-off-by: yangjiang <ya...@ebay.com>
    
    * Update datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
    
    * Avoid unwarp
    
    Signed-off-by: yangjiang <ya...@ebay.com>
    
    * reorg test code
    
    Signed-off-by: yangjiang <ya...@ebay.com>
    
    * add test for page index
    
    Signed-off-by: yangjiang <ya...@ebay.com>
    
    * fix commet
    
    Signed-off-by: yangjiang <ya...@ebay.com>
    
    * remove code
    
    Signed-off-by: yangjiang <ya...@ebay.com>
    
    Signed-off-by: yangjiang <ya...@ebay.com>
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
 .../core/src/physical_plan/file_format/parquet.rs  |  41 +-
 .../file_format/parquet/page_filter.rs             |  90 +++-
 .../file_format/parquet/row_groups.rs              |  44 +-
 datafusion/core/tests/parquet/filter_pushdown.rs   |  53 +-
 datafusion/core/tests/parquet/mod.rs               | 538 +++++++++++++++++++++
 datafusion/core/tests/parquet/page_pruning.rs      | 512 +++++++++++++++++++-
 datafusion/core/tests/parquet/row_group_pruning.rs | 500 +------------------
 test-utils/src/data_gen.rs                         |  13 +-
 8 files changed, 1233 insertions(+), 558 deletions(-)

diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 2ba8203a5..fa68f3072 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -17,7 +17,7 @@
 
 //! Execution plan for reading Parquet files
 
-use arrow::datatypes::SchemaRef;
+use arrow::datatypes::{DataType, SchemaRef};
 use fmt::Debug;
 use std::any::Any;
 use std::fmt;
@@ -55,8 +55,10 @@ use object_store::{ObjectMeta, ObjectStore};
 use parquet::arrow::arrow_reader::ArrowReaderOptions;
 use parquet::arrow::async_reader::AsyncFileReader;
 use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask};
+use parquet::basic::{ConvertedType, LogicalType};
 use parquet::errors::ParquetError;
 use parquet::file::{metadata::ParquetMetaData, properties::WriterProperties};
+use parquet::schema::types::ColumnDescriptor;
 
 mod metrics;
 mod page_filter;
@@ -674,6 +676,43 @@ pub async fn plan_to_parquet(
     }
 }
 
+// TODO: consolidate code with arrow-rs
+// Convert the bytes array to i128.
+// The endian of the input bytes array must be big-endian.
+// Copy from the arrow-rs
+pub(crate) fn from_bytes_to_i128(b: &[u8]) -> i128 {
+    assert!(b.len() <= 16, "Decimal128Array supports only up to size 16");
+    let first_bit = b[0] & 128u8 == 128u8;
+    let mut result = if first_bit { [255u8; 16] } else { [0u8; 16] };
+    for (i, v) in b.iter().enumerate() {
+        result[i + (16 - b.len())] = *v;
+    }
+    // The bytes array are from parquet file and must be the big-endian.
+    // The endian is defined by parquet format, and the reference document
+    // https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66
+    i128::from_be_bytes(result)
+}
+
+// Convert parquet column schema to arrow data type, and just consider the
+// decimal data type.
+pub(crate) fn parquet_to_arrow_decimal_type(
+    parquet_column: &ColumnDescriptor,
+) -> Option<DataType> {
+    let type_ptr = parquet_column.self_type_ptr();
+    match type_ptr.get_basic_info().logical_type() {
+        Some(LogicalType::Decimal { scale, precision }) => {
+            Some(DataType::Decimal128(precision as u8, scale as u8))
+        }
+        _ => match type_ptr.get_basic_info().converted_type() {
+            ConvertedType::DECIMAL => Some(DataType::Decimal128(
+                type_ptr.get_precision() as u8,
+                type_ptr.get_scale() as u8,
+            )),
+            _ => None,
+        },
+    }
+}
+
 #[cfg(test)]
 mod tests {
     // See also `parquet_exec` integration test
diff --git a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
index 5f31a6a49..a1cf03666 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
@@ -17,11 +17,16 @@
 
 //! Contains code to filter entire pages
 
-use arrow::array::{BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array};
+use arrow::array::{
+    BooleanArray, Decimal128Array, Float32Array, Float64Array, Int32Array, Int64Array,
+    StringArray,
+};
+use arrow::datatypes::DataType;
 use arrow::{array::ArrayRef, datatypes::SchemaRef, error::ArrowError};
 use datafusion_common::{Column, DataFusionError, Result};
 use datafusion_optimizer::utils::split_conjunction;
 use log::{debug, error, trace};
+use parquet::schema::types::ColumnDescriptor;
 use parquet::{
     arrow::arrow_reader::{RowSelection, RowSelector},
     errors::ParquetError,
@@ -35,6 +40,9 @@ use std::collections::VecDeque;
 use std::sync::Arc;
 
 use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
+use crate::physical_plan::file_format::parquet::{
+    from_bytes_to_i128, parquet_to_arrow_decimal_type,
+};
 
 use super::metrics::ParquetFileMetrics;
 
@@ -132,6 +140,7 @@ pub(crate) fn build_page_filter(
                             &predicate,
                             rg_offset_indexes.get(col_id),
                             rg_page_indexes.get(col_id),
+                            groups[*r].column(col_id).column_descr(),
                             file_metrics,
                         )
                         .map_err(|e| {
@@ -305,15 +314,18 @@ fn prune_pages_in_one_row_group(
     predicate: &PruningPredicate,
     col_offset_indexes: Option<&Vec<PageLocation>>,
     col_page_indexes: Option<&Index>,
+    col_desc: &ColumnDescriptor,
     metrics: &ParquetFileMetrics,
 ) -> Result<Vec<RowSelector>> {
     let num_rows = group.num_rows() as usize;
     if let (Some(col_offset_indexes), Some(col_page_indexes)) =
         (col_offset_indexes, col_page_indexes)
     {
+        let target_type = parquet_to_arrow_decimal_type(col_desc);
         let pruning_stats = PagesPruningStatistics {
             col_page_indexes,
             col_offset_indexes,
+            target_type: &target_type,
         };
 
         match predicate.prune(&pruning_stats) {
@@ -382,6 +394,9 @@ fn create_row_count_in_each_page(
 struct PagesPruningStatistics<'a> {
     col_page_indexes: &'a Index,
     col_offset_indexes: &'a Vec<PageLocation>,
+    // target_type means the logical type in schema: like 'DECIMAL' is the logical type, but the
+    // real physical type in parquet file may be `INT32, INT64, FIXED_LEN_BYTE_ARRAY`
+    target_type: &'a Option<DataType>,
 }
 
 // Extract the min or max value calling `func` from page idex
@@ -390,16 +405,48 @@ macro_rules! get_min_max_values_for_page_index {
         match $self.col_page_indexes {
             Index::NONE => None,
             Index::INT32(index) => {
-                let vec = &index.indexes;
-                Some(Arc::new(Int32Array::from_iter(
-                    vec.iter().map(|x| x.$func().cloned()),
-                )))
+                match $self.target_type {
+                    // int32 to decimal with the precision and scale
+                    Some(DataType::Decimal128(precision, scale)) => {
+                        let vec = &index.indexes;
+                        let vec: Vec<Option<i128>> = vec
+                            .iter()
+                            .map(|x| x.$func().and_then(|x| Some(*x as i128)))
+                            .collect();
+                        Decimal128Array::from(vec)
+                            .with_precision_and_scale(*precision, *scale)
+                            .ok()
+                            .map(|arr| Arc::new(arr) as ArrayRef)
+                    }
+                    _ => {
+                        let vec = &index.indexes;
+                        Some(Arc::new(Int32Array::from_iter(
+                            vec.iter().map(|x| x.$func().cloned()),
+                        )))
+                    }
+                }
             }
             Index::INT64(index) => {
-                let vec = &index.indexes;
-                Some(Arc::new(Int64Array::from_iter(
-                    vec.iter().map(|x| x.$func().cloned()),
-                )))
+                match $self.target_type {
+                    // int64 to decimal with the precision and scale
+                    Some(DataType::Decimal128(precision, scale)) => {
+                        let vec = &index.indexes;
+                        let vec: Vec<Option<i128>> = vec
+                            .iter()
+                            .map(|x| x.$func().and_then(|x| Some(*x as i128)))
+                            .collect();
+                        Decimal128Array::from(vec)
+                            .with_precision_and_scale(*precision, *scale)
+                            .ok()
+                            .map(|arr| Arc::new(arr) as ArrayRef)
+                    }
+                    _ => {
+                        let vec = &index.indexes;
+                        Some(Arc::new(Int64Array::from_iter(
+                            vec.iter().map(|x| x.$func().cloned()),
+                        )))
+                    }
+                }
             }
             Index::FLOAT(index) => {
                 let vec = &index.indexes;
@@ -419,10 +466,33 @@ macro_rules! get_min_max_values_for_page_index {
                     vec.iter().map(|x| x.$func().cloned()),
                 )))
             }
-            Index::INT96(_) | Index::BYTE_ARRAY(_) | Index::FIXED_LEN_BYTE_ARRAY(_) => {
+            Index::BYTE_ARRAY(index) => {
+                let vec = &index.indexes;
+                let array: StringArray = vec
+                    .iter()
+                    .map(|x| x.$func())
+                    .map(|x| x.and_then(|x| std::str::from_utf8(x).ok()))
+                    .collect();
+                Some(Arc::new(array))
+            }
+            Index::INT96(_) => {
                 //Todo support these type
                 None
             }
+            Index::FIXED_LEN_BYTE_ARRAY(index) => match $self.target_type {
+                Some(DataType::Decimal128(precision, scale)) => {
+                    let vec = &index.indexes;
+                    Decimal128Array::from(
+                        vec.iter()
+                            .map(|x| x.$func().and_then(|x| Some(from_bytes_to_i128(x))))
+                            .collect::<Vec<Option<i128>>>(),
+                    )
+                    .with_precision_and_scale(*precision, *scale)
+                    .ok()
+                    .map(|arr| Arc::new(arr) as ArrayRef)
+                }
+                _ => None,
+            },
         }
     }};
 }
diff --git a/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs b/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs
index dab2a4226..0f0b37807 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs
@@ -23,16 +23,17 @@ use datafusion_common::Column;
 use datafusion_common::ScalarValue;
 use log::debug;
 
-use parquet::{
-    file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics},
-    schema::types::ColumnDescriptor,
+use parquet::file::{
+    metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics,
 };
 
+use crate::physical_plan::file_format::parquet::{
+    from_bytes_to_i128, parquet_to_arrow_decimal_type,
+};
 use crate::{
     datasource::listing::FileRange,
     physical_optimizer::pruning::{PruningPredicate, PruningStatistics},
 };
-use parquet::basic::{ConvertedType, LogicalType};
 
 use super::ParquetFileMetrics;
 
@@ -85,23 +86,6 @@ struct RowGroupPruningStatistics<'a> {
     parquet_schema: &'a Schema,
 }
 
-// TODO: consolidate code with arrow-rs
-// Convert the bytes array to i128.
-// The endian of the input bytes array must be big-endian.
-// Copy from the arrow-rs
-fn from_bytes_to_i128(b: &[u8]) -> i128 {
-    assert!(b.len() <= 16, "Decimal128Array supports only up to size 16");
-    let first_bit = b[0] & 128u8 == 128u8;
-    let mut result = if first_bit { [255u8; 16] } else { [0u8; 16] };
-    for (i, v) in b.iter().enumerate() {
-        result[i + (16 - b.len())] = *v;
-    }
-    // The bytes array are from parquet file and must be the big-endian.
-    // The endian is defined by parquet format, and the reference document
-    // https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66
-    i128::from_be_bytes(result)
-}
-
 /// Extract the min/max statistics from a `ParquetStatistics` object
 macro_rules! get_statistic {
     ($column_statistics:expr, $func:ident, $bytes_func:ident, $target_arrow_type:expr) => {{
@@ -217,24 +201,6 @@ macro_rules! get_null_count_values {
     }};
 }
 
-// Convert parquet column schema to arrow data type, and just consider the
-// decimal data type.
-fn parquet_to_arrow_decimal_type(parquet_column: &ColumnDescriptor) -> Option<DataType> {
-    let type_ptr = parquet_column.self_type_ptr();
-    match type_ptr.get_basic_info().logical_type() {
-        Some(LogicalType::Decimal { scale, precision }) => {
-            Some(DataType::Decimal128(precision as u8, scale as u8))
-        }
-        _ => match type_ptr.get_basic_info().converted_type() {
-            ConvertedType::DECIMAL => Some(DataType::Decimal128(
-                type_ptr.get_precision() as u8,
-                type_ptr.get_scale() as u8,
-            )),
-            _ => None,
-        },
-    }
-}
-
 impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
     fn min_values(&self, column: &Column) -> Option<ArrayRef> {
         get_min_max_values!(self, column, min, min_bytes)
diff --git a/datafusion/core/tests/parquet/filter_pushdown.rs b/datafusion/core/tests/parquet/filter_pushdown.rs
index 657f00d0c..999becafd 100644
--- a/datafusion/core/tests/parquet/filter_pushdown.rs
+++ b/datafusion/core/tests/parquet/filter_pushdown.rs
@@ -266,20 +266,17 @@ async fn single_file_small_data_pages() {
     // page 3:                                     DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: djzdyiecnumrsrcbizwlqzdhnpoiqdh, max: fktdcgtmzvoedpwhfevcvvrtaurzgex, num_nulls not defined] CRC:[none] SZ:7 VC:9216
     // page 4:                                     DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: fktdcgtmzvoedpwhfevcvvrtaurzgex, max: fwtdpgtxwqkkgtgvthhwycrvjiizdifyp, num_nulls not defined] CRC:[none] SZ:7 VC:9216
     // page 5:                                     DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: fwtdpgtxwqkkgtgvthhwycrvjiizdifyp, max: iadnalqpdzthpifrvewossmpqibgtsuin, num_nulls not defined] CRC:[none] SZ:7 VC:7739
-    //
-    // This test currently fails due to https://github.com/apache/arrow-datafusion/issues/3833
-    // (page index pruning not implemented for byte array)
-
-    // TestCase::new(&test_parquet_file)
-    //     .with_name("selective")
-    //     // predicate is chosen carefully to prune pages 0, 1, 2, 3, 4
-    //     // pod = 'iadnalqpdzthpifrvewossmpqibgtsuin'
-    //     .with_filter(col("pod").eq(lit("iadnalqpdzthpifrvewossmpqibgtsuin")))
-    //     .with_pushdown_expected(PushdownExpected::Some)
-    //     .with_page_index_filtering_expected(PageIndexFilteringExpected::Some)
-    //     .with_expected_rows(2574)
-    //     .run()
-    //     .await;
+
+    TestCase::new(&test_parquet_file)
+        .with_name("selective")
+        // predicate is chosen carefully to prune pages 0, 1, 2, 3, 4
+        // pod = 'iadnalqpdzthpifrvewossmpqibgtsuin'
+        .with_filter(col("pod").eq(lit("iadnalqpdzthpifrvewossmpqibgtsuin")))
+        .with_pushdown_expected(PushdownExpected::Some)
+        .with_page_index_filtering_expected(PageIndexFilteringExpected::Some)
+        .with_expected_rows(2574)
+        .run()
+        .await;
 
     // time TV=53819 RL=0 DL=0 DS:                7092 DE:PLAIN
     // --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
@@ -299,6 +296,34 @@ async fn single_file_small_data_pages() {
         .with_expected_rows(9745)
         .run()
         .await;
+
+    // decimal_price TV=53819 RL=0 DL=0
+    // ----------------------------------------------------------------------------
+    // row group 0:
+    //     column index for column decimal_price:
+    //     Boudary order: UNORDERED
+    //                       null count  min                                       max
+    // page-0                         0  1                                         9216
+    // page-1                         0  9217                                      18432
+    // page-2                         0  18433                                     27648
+    // page-3                         0  27649                                     36864
+    // page-4                         0  36865                                     46080
+    // page-5                         0  46081                                     53819
+    //
+    // offset index for column decimal_price:
+    //                            offset   compressed size       first row index
+    // page-0                   5581636            147517                     0
+    // page-1                   5729153            147517                  9216
+    TestCase::new(&test_parquet_file)
+        .with_name("selective_on_decimal")
+        // predicate is chosen carefully to prune pages 1, 2, 3, 4, and 5
+        // decimal_price < 9200
+        .with_filter(col("decimal_price").lt_eq(lit(9200)))
+        .with_pushdown_expected(PushdownExpected::Some)
+        .with_page_index_filtering_expected(PageIndexFilteringExpected::Some)
+        .with_expected_rows(9200)
+        .run()
+        .await;
 }
 
 /// Expected pushdown behavior
diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs
index ab410bd76..255df515c 100644
--- a/datafusion/core/tests/parquet/mod.rs
+++ b/datafusion/core/tests/parquet/mod.rs
@@ -16,7 +16,545 @@
 // under the License.
 
 //! Parquet integration tests
+use arrow::array::Decimal128Array;
+use arrow::{
+    array::{
+        Array, ArrayRef, Date32Array, Date64Array, Float64Array, Int32Array, StringArray,
+        TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
+        TimestampSecondArray,
+    },
+    datatypes::{DataType, Field, Schema},
+    record_batch::RecordBatch,
+    util::pretty::pretty_format_batches,
+};
+use chrono::{Datelike, Duration};
+use datafusion::config::OPT_PARQUET_ENABLE_PAGE_INDEX;
+use datafusion::{
+    datasource::{provider_as_source, TableProvider},
+    physical_plan::{
+        accept, file_format::ParquetExec, metrics::MetricsSet, ExecutionPlan,
+        ExecutionPlanVisitor,
+    },
+    prelude::{ParquetReadOptions, SessionConfig, SessionContext},
+};
+use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder};
+use parquet::arrow::ArrowWriter;
+use parquet::file::properties::WriterProperties;
+use std::sync::Arc;
+use tempfile::NamedTempFile;
+
 mod custom_reader;
 mod filter_pushdown;
 mod page_pruning;
 mod row_group_pruning;
+
+// ----------------------
+// Begin test fixture
+// ----------------------
+
+/// What data to use
+enum Scenario {
+    Timestamps,
+    Dates,
+    Int32,
+    Float64,
+    Decimal,
+    DecimalLargePrecision,
+}
+
+enum Unit {
+    RowGroup,
+    Page,
+}
+
+/// Test fixture that has an execution context that has an external
+/// table "t" registered, pointing at a parquet file made with
+/// `make_test_file`
+struct ContextWithParquet {
+    #[allow(dead_code)]
+    /// temp file parquet data is written to. The file is cleaned up
+    /// when dropped
+    file: NamedTempFile,
+    provider: Arc<dyn TableProvider>,
+    ctx: SessionContext,
+}
+
+/// The output of running one of the test cases
+struct TestOutput {
+    /// The input string
+    sql: String,
+    /// Execution metrics for the Parquet Scan
+    parquet_metrics: MetricsSet,
+    /// number of rows in results
+    result_rows: usize,
+    /// the contents of the input, as a string
+    pretty_input: String,
+    /// the raw results, as a string
+    pretty_results: String,
+}
+
+impl TestOutput {
+    /// retrieve the value of the named metric, if any
+    fn metric_value(&self, metric_name: &str) -> Option<usize> {
+        self.parquet_metrics
+            .sum(|metric| metric.value().name() == metric_name)
+            .map(|v| v.as_usize())
+    }
+
+    /// The number of times the pruning predicate evaluation errors
+    fn predicate_evaluation_errors(&self) -> Option<usize> {
+        self.metric_value("predicate_evaluation_errors")
+    }
+
+    /// The number of times the pruning predicate evaluation errors
+    fn row_groups_pruned(&self) -> Option<usize> {
+        self.metric_value("row_groups_pruned")
+    }
+
+    /// The number of times the pruning predicate evaluation errors
+    fn row_pages_pruned(&self) -> Option<usize> {
+        self.metric_value("page_index_rows_filtered")
+    }
+
+    fn description(&self) -> String {
+        format!(
+            "Input:\n{}\nQuery:\n{}\nOutput:\n{}\nMetrics:\n{}",
+            self.pretty_input, self.sql, self.pretty_results, self.parquet_metrics,
+        )
+    }
+}
+
+/// Creates an execution context that has an external table "t"
+/// registered pointing at a parquet file made with `make_test_file`
+/// and the appropriate scenario
+impl ContextWithParquet {
+    async fn new(scenario: Scenario, unit: Unit) -> Self {
+        Self::with_config(scenario, unit, SessionConfig::new()).await
+    }
+
+    async fn with_config(scenario: Scenario, unit: Unit, config: SessionConfig) -> Self {
+        let file = match unit {
+            Unit::RowGroup => make_test_file_rg(scenario).await,
+            Unit::Page => {
+                config
+                    .config_options
+                    .write()
+                    .set_bool(OPT_PARQUET_ENABLE_PAGE_INDEX, true);
+                make_test_file_page(scenario).await
+            }
+        };
+        let parquet_path = file.path().to_string_lossy();
+
+        // now, setup a the file as a data source and run a query against it
+        let ctx = SessionContext::with_config(config);
+
+        ctx.register_parquet("t", &parquet_path, ParquetReadOptions::default())
+            .await
+            .unwrap();
+        let provider = ctx.deregister_table("t").unwrap().unwrap();
+        ctx.register_table("t", provider.clone()).unwrap();
+
+        Self {
+            file,
+            provider,
+            ctx,
+        }
+    }
+
+    /// runs a query like "SELECT * from t WHERE <expr> and returns
+    /// the number of output rows and normalized execution metrics
+    async fn query_with_expr(&mut self, expr: Expr) -> TestOutput {
+        let sql = format!("EXPR only: {:?}", expr);
+        let logical_plan = LogicalPlanBuilder::scan(
+            "t",
+            provider_as_source(self.provider.clone()),
+            None,
+        )
+        .unwrap()
+        .filter(expr)
+        .unwrap()
+        .build()
+        .unwrap();
+        self.run_test(logical_plan, sql).await
+    }
+
+    /// Runs the specified SQL query and returns the number of output
+    /// rows and normalized execution metrics
+    async fn query(&mut self, sql: &str) -> TestOutput {
+        println!("Planning sql {}", sql);
+        let logical_plan = self
+            .ctx
+            .sql(sql)
+            .await
+            .expect("planning")
+            .to_unoptimized_plan();
+        self.run_test(logical_plan, sql).await
+    }
+
+    /// runs the logical plan
+    async fn run_test(
+        &mut self,
+        logical_plan: LogicalPlan,
+        sql: impl Into<String>,
+    ) -> TestOutput {
+        let input = self
+            .ctx
+            .sql("SELECT * from t")
+            .await
+            .expect("planning")
+            .collect()
+            .await
+            .expect("getting input");
+        let pretty_input = pretty_format_batches(&input).unwrap().to_string();
+
+        let logical_plan = self.ctx.optimize(&logical_plan).expect("optimizing plan");
+
+        let physical_plan = self
+            .ctx
+            .create_physical_plan(&logical_plan)
+            .await
+            .expect("creating physical plan");
+
+        let task_ctx = self.ctx.task_ctx();
+        let results = datafusion::physical_plan::collect(physical_plan.clone(), task_ctx)
+            .await
+            .expect("Running");
+
+        // find the parquet metrics
+        struct MetricsFinder {
+            metrics: Option<MetricsSet>,
+        }
+        impl ExecutionPlanVisitor for MetricsFinder {
+            type Error = std::convert::Infallible;
+            fn pre_visit(
+                &mut self,
+                plan: &dyn ExecutionPlan,
+            ) -> Result<bool, Self::Error> {
+                if plan.as_any().downcast_ref::<ParquetExec>().is_some() {
+                    self.metrics = plan.metrics();
+                }
+                // stop searching once we have found the metrics
+                Ok(self.metrics.is_none())
+            }
+        }
+        let mut finder = MetricsFinder { metrics: None };
+        accept(physical_plan.as_ref(), &mut finder).unwrap();
+        let parquet_metrics = finder.metrics.unwrap();
+
+        let result_rows = results.iter().map(|b| b.num_rows()).sum();
+
+        let pretty_results = pretty_format_batches(&results).unwrap().to_string();
+
+        let sql = sql.into();
+        TestOutput {
+            sql,
+            parquet_metrics,
+            result_rows,
+            pretty_input,
+            pretty_results,
+        }
+    }
+}
+
+/// Return record batch with a few rows of data for all of the supported timestamp types
+/// values with the specified offset
+///
+/// Columns are named:
+/// "nanos" --> TimestampNanosecondArray
+/// "micros" --> TimestampMicrosecondArray
+/// "millis" --> TimestampMillisecondArray
+/// "seconds" --> TimestampSecondArray
+/// "names" --> StringArray
+fn make_timestamp_batch(offset: Duration) -> RecordBatch {
+    let ts_strings = vec![
+        Some("2020-01-01T01:01:01.0000000000001"),
+        Some("2020-01-01T01:02:01.0000000000001"),
+        Some("2020-01-01T02:01:01.0000000000001"),
+        None,
+        Some("2020-01-02T01:01:01.0000000000001"),
+    ];
+
+    let offset_nanos = offset.num_nanoseconds().expect("non overflow nanos");
+
+    let ts_nanos = ts_strings
+        .into_iter()
+        .map(|t| {
+            t.map(|t| {
+                offset_nanos
+                    + t.parse::<chrono::NaiveDateTime>()
+                        .unwrap()
+                        .timestamp_nanos()
+            })
+        })
+        .collect::<Vec<_>>();
+
+    let ts_micros = ts_nanos
+        .iter()
+        .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000))
+        .collect::<Vec<_>>();
+
+    let ts_millis = ts_nanos
+        .iter()
+        .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000))
+        .collect::<Vec<_>>();
+
+    let ts_seconds = ts_nanos
+        .iter()
+        .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000000))
+        .collect::<Vec<_>>();
+
+    let names = ts_nanos
+        .iter()
+        .enumerate()
+        .map(|(i, _)| format!("Row {} + {}", i, offset))
+        .collect::<Vec<_>>();
+
+    let arr_nanos = TimestampNanosecondArray::from(ts_nanos);
+    let arr_micros = TimestampMicrosecondArray::from(ts_micros);
+    let arr_millis = TimestampMillisecondArray::from(ts_millis);
+    let arr_seconds = TimestampSecondArray::from(ts_seconds);
+
+    let names = names.iter().map(|s| s.as_str()).collect::<Vec<_>>();
+    let arr_names = StringArray::from(names);
+
+    let schema = Schema::new(vec![
+        Field::new("nanos", arr_nanos.data_type().clone(), true),
+        Field::new("micros", arr_micros.data_type().clone(), true),
+        Field::new("millis", arr_millis.data_type().clone(), true),
+        Field::new("seconds", arr_seconds.data_type().clone(), true),
+        Field::new("name", arr_names.data_type().clone(), true),
+    ]);
+    let schema = Arc::new(schema);
+
+    RecordBatch::try_new(
+        schema,
+        vec![
+            Arc::new(arr_nanos),
+            Arc::new(arr_micros),
+            Arc::new(arr_millis),
+            Arc::new(arr_seconds),
+            Arc::new(arr_names),
+        ],
+    )
+    .unwrap()
+}
+
+/// Return record batch with i32 sequence
+///
+/// Columns are named
+/// "i" -> Int32Array
+fn make_int32_batch(start: i32, end: i32) -> RecordBatch {
+    let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
+    let v: Vec<i32> = (start..end).collect();
+    let array = Arc::new(Int32Array::from(v)) as ArrayRef;
+    RecordBatch::try_new(schema, vec![array.clone()]).unwrap()
+}
+
+/// Return record batch with f64 vector
+///
+/// Columns are named
+/// "f" -> Float64Array
+fn make_f64_batch(v: Vec<f64>) -> RecordBatch {
+    let schema = Arc::new(Schema::new(vec![Field::new("f", DataType::Float64, true)]));
+    let array = Arc::new(Float64Array::from(v)) as ArrayRef;
+    RecordBatch::try_new(schema, vec![array.clone()]).unwrap()
+}
+
+/// Return record batch with decimal vector
+///
+/// Columns are named
+/// "decimal_col" -> DecimalArray
+fn make_decimal_batch(v: Vec<i128>, precision: u8, scale: u8) -> RecordBatch {
+    let schema = Arc::new(Schema::new(vec![Field::new(
+        "decimal_col",
+        DataType::Decimal128(precision, scale),
+        true,
+    )]));
+    let array = Arc::new(
+        Decimal128Array::from_iter_values(v)
+            .with_precision_and_scale(precision, scale)
+            .unwrap(),
+    ) as ArrayRef;
+    RecordBatch::try_new(schema, vec![array.clone()]).unwrap()
+}
+
+/// Return record batch with a few rows of data for all of the supported date
+/// types with the specified offset (in days)
+///
+/// Columns are named:
+/// "date32" --> Date32Array
+/// "date64" --> Date64Array
+/// "names" --> StringArray
+fn make_date_batch(offset: Duration) -> RecordBatch {
+    let date_strings = vec![
+        Some("2020-01-01"),
+        Some("2020-01-02"),
+        Some("2020-01-03"),
+        None,
+        Some("2020-01-04"),
+    ];
+
+    let names = date_strings
+        .iter()
+        .enumerate()
+        .map(|(i, val)| format!("Row {} + {}: {:?}", i, offset, val))
+        .collect::<Vec<_>>();
+
+    // Copied from `cast.rs` cast kernel due to lack of temporal kernels
+    // https://github.com/apache/arrow-rs/issues/527
+    const EPOCH_DAYS_FROM_CE: i32 = 719_163;
+
+    let date_seconds = date_strings
+        .iter()
+        .map(|t| {
+            t.map(|t| {
+                let t = t.parse::<chrono::NaiveDate>().unwrap();
+                let t = t + offset;
+                t.num_days_from_ce() - EPOCH_DAYS_FROM_CE
+            })
+        })
+        .collect::<Vec<_>>();
+
+    let date_millis = date_strings
+        .into_iter()
+        .map(|t| {
+            t.map(|t| {
+                let t = t
+                    .parse::<chrono::NaiveDate>()
+                    .unwrap()
+                    .and_time(chrono::NaiveTime::from_hms_opt(0, 0, 0).unwrap());
+                let t = t + offset;
+                t.timestamp_millis()
+            })
+        })
+        .collect::<Vec<_>>();
+
+    let arr_date32 = Date32Array::from(date_seconds);
+    let arr_date64 = Date64Array::from(date_millis);
+
+    let names = names.iter().map(|s| s.as_str()).collect::<Vec<_>>();
+    let arr_names = StringArray::from(names);
+
+    let schema = Schema::new(vec![
+        Field::new("date32", arr_date32.data_type().clone(), true),
+        Field::new("date64", arr_date64.data_type().clone(), true),
+        Field::new("name", arr_names.data_type().clone(), true),
+    ]);
+    let schema = Arc::new(schema);
+
+    RecordBatch::try_new(
+        schema,
+        vec![
+            Arc::new(arr_date32),
+            Arc::new(arr_date64),
+            Arc::new(arr_names),
+        ],
+    )
+    .unwrap()
+}
+
+fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
+    match scenario {
+        Scenario::Timestamps => {
+            vec![
+                make_timestamp_batch(Duration::seconds(0)),
+                make_timestamp_batch(Duration::seconds(10)),
+                make_timestamp_batch(Duration::minutes(10)),
+                make_timestamp_batch(Duration::days(10)),
+            ]
+        }
+        Scenario::Dates => {
+            vec![
+                make_date_batch(Duration::days(0)),
+                make_date_batch(Duration::days(10)),
+                make_date_batch(Duration::days(300)),
+                make_date_batch(Duration::days(3600)),
+            ]
+        }
+        Scenario::Int32 => {
+            vec![
+                make_int32_batch(-5, 0),
+                make_int32_batch(-4, 1),
+                make_int32_batch(0, 5),
+                make_int32_batch(5, 10),
+            ]
+        }
+        Scenario::Float64 => {
+            vec![
+                make_f64_batch(vec![-5.0, -4.0, -3.0, -2.0, -1.0]),
+                make_f64_batch(vec![-4.0, -3.0, -2.0, -1.0, 0.0]),
+                make_f64_batch(vec![0.0, 1.0, 2.0, 3.0, 4.0]),
+                make_f64_batch(vec![5.0, 6.0, 7.0, 8.0, 9.0]),
+            ]
+        }
+        Scenario::Decimal => {
+            // decimal record batch
+            vec![
+                make_decimal_batch(vec![100, 200, 300, 400, 600], 9, 2),
+                make_decimal_batch(vec![-500, 100, 300, 400, 600], 9, 2),
+                make_decimal_batch(vec![2000, 3000, 3000, 4000, 6000], 9, 2),
+            ]
+        }
+        Scenario::DecimalLargePrecision => {
+            // decimal record batch with large precision,
+            // and the data will stored as FIXED_LENGTH_BYTE_ARRAY
+            vec![
+                make_decimal_batch(vec![100, 200, 300, 400, 600], 38, 2),
+                make_decimal_batch(vec![-500, 100, 300, 400, 600], 38, 2),
+                make_decimal_batch(vec![2000, 3000, 3000, 4000, 6000], 38, 2),
+            ]
+        }
+    }
+}
+
+/// Create a test parquet file with varioud data types
+async fn make_test_file_rg(scenario: Scenario) -> NamedTempFile {
+    let mut output_file = tempfile::Builder::new()
+        .prefix("parquet_pruning")
+        .suffix(".parquet")
+        .tempfile()
+        .expect("tempfile creation");
+
+    let props = WriterProperties::builder()
+        .set_max_row_group_size(5)
+        .build();
+
+    let batches = create_data_batch(scenario);
+
+    let schema = batches[0].schema();
+
+    let mut writer = ArrowWriter::try_new(&mut output_file, schema, Some(props)).unwrap();
+
+    for batch in batches {
+        writer.write(&batch).expect("writing batch");
+    }
+    writer.close().unwrap();
+
+    output_file
+}
+
+async fn make_test_file_page(scenario: Scenario) -> NamedTempFile {
+    let mut output_file = tempfile::Builder::new()
+        .prefix("parquet_page_pruning")
+        .suffix(".parquet")
+        .tempfile()
+        .expect("tempfile creation");
+
+    // set row count to 5, should get same result as rowGroup
+    let props = WriterProperties::builder()
+        .set_data_page_row_count_limit(5)
+        .set_write_batch_size(5)
+        .build();
+
+    let batches = create_data_batch(scenario);
+
+    let schema = batches[0].schema();
+
+    let mut writer = ArrowWriter::try_new(&mut output_file, schema, Some(props)).unwrap();
+
+    for batch in batches {
+        writer.write(&batch).expect("writing batch");
+    }
+    writer.close().unwrap();
+    output_file
+}
diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs
index 8d8c3bcae..8c67bc346 100644
--- a/datafusion/core/tests/parquet/page_pruning.rs
+++ b/datafusion/core/tests/parquet/page_pruning.rs
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::parquet::Unit::Page;
+use crate::parquet::{ContextWithParquet, Scenario};
 use datafusion::config::ConfigOptions;
 use datafusion::datasource::file_format::parquet::ParquetFormat;
 use datafusion::datasource::file_format::FileFormat;
@@ -23,7 +25,7 @@ use datafusion::datasource::object_store::ObjectStoreUrl;
 use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec};
 use datafusion::physical_plan::ExecutionPlan;
 use datafusion::prelude::SessionContext;
-use datafusion_common::Statistics;
+use datafusion_common::{ScalarValue, Statistics};
 use datafusion_expr::{col, lit, Expr};
 use object_store::path::Path;
 use object_store::ObjectMeta;
@@ -142,6 +144,22 @@ async fn page_index_filter_one_col() {
 
     // should same with `month = 1`
     assert_eq!(batch.num_rows(), 645);
+
+    let session_ctx = SessionContext::new();
+    let task_ctx = session_ctx.task_ctx();
+
+    // 5.create filter date_string_col == 1;
+    let filter = col("date_string_col").eq(lit("01/01/09"));
+    let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await;
+    let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap();
+    let batch = results.next().await.unwrap().unwrap();
+
+    // there should only two pages match the filter
+    //                                  min                                        max
+    // page-20                        0  01/01/09                                  01/02/09
+    // page-21                        0  01/01/09                                  01/01/09
+    // each 7 rows
+    assert_eq!(batch.num_rows(), 14);
 }
 
 #[tokio::test]
@@ -204,3 +222,495 @@ async fn page_index_filter_multi_col() {
     let batch = results.next().await.unwrap().unwrap();
     assert_eq!(batch.num_rows(), 7300);
 }
+
+async fn test_prune(
+    case_data_type: Scenario,
+    sql: &str,
+    expected_errors: Option<usize>,
+    expected_row_pages_pruned: Option<usize>,
+    expected_results: usize,
+) {
+    let output = ContextWithParquet::new(case_data_type, Page)
+        .await
+        .query(sql)
+        .await;
+
+    println!("{}", output.description());
+    assert_eq!(output.predicate_evaluation_errors(), expected_errors);
+    assert_eq!(output.row_pages_pruned(), expected_row_pages_pruned);
+    assert_eq!(
+        output.result_rows,
+        expected_results,
+        "{}",
+        output.description()
+    );
+}
+
+#[tokio::test]
+//                       null count  min                                       max
+// page-0                         1  2020-01-01T01:01:01.000000000             2020-01-02T01:01:01.000000000
+// page-1                         1  2020-01-01T01:01:11.000000000             2020-01-02T01:01:11.000000000
+// page-2                         1  2020-01-01T01:11:01.000000000             2020-01-02T01:11:01.000000000
+// page-3                         1  2020-01-11T01:01:01.000000000             2020-01-12T01:01:01.000000000
+async fn prune_timestamps_nanos() {
+    test_prune(
+        Scenario::Timestamps,
+        "SELECT * FROM t where nanos < to_timestamp('2020-01-02 01:01:11Z')",
+        Some(0),
+        Some(5),
+        10,
+    )
+    .await;
+}
+
+#[tokio::test]
+//                         null count  min                                       max
+// page-0                         1  2020-01-01T01:01:01.000000                2020-01-02T01:01:01.000000
+// page-1                         1  2020-01-01T01:01:11.000000                2020-01-02T01:01:11.000000
+// page-2                         1  2020-01-01T01:11:01.000000                2020-01-02T01:11:01.000000
+// page-3                         1  2020-01-11T01:01:01.000000                2020-01-12T01:01:01.000000
+async fn prune_timestamps_micros() {
+    test_prune(
+        Scenario::Timestamps,
+        "SELECT * FROM t where micros < to_timestamp_micros('2020-01-02 01:01:11Z')",
+        Some(0),
+        Some(5),
+        10,
+    )
+    .await;
+}
+
+#[tokio::test]
+//                      null count  min                                       max
+// page-0                         1  2020-01-01T01:01:01.000                   2020-01-02T01:01:01.000
+// page-1                         1  2020-01-01T01:01:11.000                   2020-01-02T01:01:11.000
+// page-2                         1  2020-01-01T01:11:01.000                   2020-01-02T01:11:01.000
+// page-3                         1  2020-01-11T01:01:01.000                   2020-01-12T01:01:01.000
+async fn prune_timestamps_millis() {
+    test_prune(
+        Scenario::Timestamps,
+        "SELECT * FROM t where millis < to_timestamp_millis('2020-01-02 01:01:11Z')",
+        Some(0),
+        Some(5),
+        10,
+    )
+    .await;
+}
+
+#[tokio::test]
+//                      null count  min                                       max
+// page-0                         1  1577840461                                1577926861
+// page-1                         1  1577840471                                1577926871
+// page-2                         1  1577841061                                1577927461
+// page-3                         1  1578704461                                1578790861
+
+async fn prune_timestamps_seconds() {
+    test_prune(
+        Scenario::Timestamps,
+        "SELECT * FROM t where seconds < to_timestamp_seconds('2020-01-02 01:01:11Z')",
+        Some(0),
+        Some(5),
+        10,
+    )
+    .await;
+}
+
+#[tokio::test]
+//                       null count  min                                       max
+// page-0                         1  2020-01-01                                2020-01-04
+// page-1                         1  2020-01-11                                2020-01-14
+// page-2                         1  2020-10-27                                2020-10-30
+// page-3                         1  2029-11-09                                2029-11-12
+async fn prune_date32() {
+    test_prune(
+        Scenario::Dates,
+        "SELECT * FROM t where date32 < cast('2020-01-02' as date)",
+        Some(0),
+        Some(15),
+        1,
+    )
+    .await;
+}
+
+#[tokio::test]
+//                      null count  min                                       max
+// page-0                         1  2020-01-01                                2020-01-04
+// page-1                         1  2020-01-11                                2020-01-14
+// page-2                         1  2020-10-27                                2020-10-30
+// page-3                         1  2029-11-09                                2029-11-12
+async fn prune_date64() {
+    // work around for not being able to cast Date32 to Date64 automatically
+    let date = "2020-01-02"
+        .parse::<chrono::NaiveDate>()
+        .unwrap()
+        .and_time(chrono::NaiveTime::from_hms_opt(0, 0, 0).unwrap());
+    let date = ScalarValue::Date64(Some(date.timestamp_millis()));
+
+    let output = ContextWithParquet::new(Scenario::Dates, Page)
+        .await
+        .query_with_expr(col("date64").lt(lit(date)))
+        .await;
+
+    println!("{}", output.description());
+    // This should prune out groups  without error
+    assert_eq!(output.predicate_evaluation_errors(), Some(0));
+    assert_eq!(output.row_pages_pruned(), Some(15));
+    assert_eq!(output.result_rows, 1, "{}", output.description());
+}
+
+#[tokio::test]
+//                      null count  min                                       max
+// page-0                         0  -5                                        -1
+// page-1                         0  -4                                        0
+// page-2                         0  0                                         4
+// page-3                         0  5                                         9
+async fn prune_int32_lt() {
+    test_prune(
+        Scenario::Int32,
+        "SELECT * FROM t where i < 1",
+        Some(0),
+        Some(5),
+        11,
+    )
+    .await;
+    // result of sql "SELECT * FROM t where i < 1" is same as
+    // "SELECT * FROM t where -i > -1"
+    test_prune(
+        Scenario::Int32,
+        "SELECT * FROM t where -i > -1",
+        Some(0),
+        Some(5),
+        11,
+    )
+    .await;
+}
+
+#[tokio::test]
+async fn prune_int32_gt() {
+    test_prune(
+        Scenario::Int32,
+        "SELECT * FROM t where i > 8",
+        Some(0),
+        Some(15),
+        1,
+    )
+    .await;
+
+    test_prune(
+        Scenario::Int32,
+        "SELECT * FROM t where -i < -8",
+        Some(0),
+        Some(15),
+        1,
+    )
+    .await;
+}
+
+#[tokio::test]
+async fn prune_int32_eq() {
+    test_prune(
+        Scenario::Int32,
+        "SELECT * FROM t where i = 1",
+        Some(0),
+        Some(15),
+        1,
+    )
+    .await;
+}
+#[tokio::test]
+#[ignore]
+async fn prune_int32_scalar_fun_and_eq() {
+    test_prune(
+        Scenario::Int32,
+        "SELECT * FROM t where abs(i) = 1  and i = 1",
+        Some(0),
+        Some(15),
+        1,
+    )
+    .await;
+}
+
+#[tokio::test]
+#[ignore]
+async fn prune_int32_scalar_fun() {
+    test_prune(
+        Scenario::Int32,
+        "SELECT * FROM t where abs(i) = 1",
+        Some(0),
+        Some(0),
+        3,
+    )
+    .await;
+}
+
+#[tokio::test]
+#[ignore]
+async fn prune_int32_complex_expr() {
+    test_prune(
+        Scenario::Int32,
+        "SELECT * FROM t where i+1 = 1",
+        Some(0),
+        Some(0),
+        2,
+    )
+    .await;
+}
+
+#[tokio::test]
+#[ignore]
+async fn prune_int32_complex_expr_subtract() {
+    test_prune(
+        Scenario::Int32,
+        "SELECT * FROM t where 1-i > 1",
+        Some(0),
+        Some(0),
+        9,
+    )
+    .await;
+}
+
+#[tokio::test]
+//                      null count  min                                       max
+// page-0                         0  -5.0                                      -1.0
+// page-1                         0  -4.0                                      0.0
+// page-2                         0  0.0                                       4.0
+// page-3                         0  5.0                                       9.0
+async fn prune_f64_lt() {
+    test_prune(
+        Scenario::Float64,
+        "SELECT * FROM t where f < 1",
+        Some(0),
+        Some(5),
+        11,
+    )
+    .await;
+    test_prune(
+        Scenario::Float64,
+        "SELECT * FROM t where -f > -1",
+        Some(0),
+        Some(5),
+        11,
+    )
+    .await;
+}
+
+#[tokio::test]
+#[ignore]
+async fn prune_f64_scalar_fun_and_gt() {
+    // result of sql "SELECT * FROM t where abs(f - 1) <= 0.000001  and f >= 0.1"
+    // only use "f >= 0" to prune
+    test_prune(
+        Scenario::Float64,
+        "SELECT * FROM t where abs(f - 1) <= 0.000001  and f >= 0.1",
+        Some(0),
+        Some(2),
+        1,
+    )
+    .await;
+}
+
+#[tokio::test]
+#[ignore]
+async fn prune_f64_scalar_fun() {
+    // result of sql "SELECT * FROM t where abs(f-1) <= 0.000001" is not supported
+    test_prune(
+        Scenario::Float64,
+        "SELECT * FROM t where abs(f-1) <= 0.000001",
+        Some(0),
+        Some(0),
+        1,
+    )
+    .await;
+}
+
+#[tokio::test]
+#[ignore]
+async fn prune_f64_complex_expr() {
+    // result of sql "SELECT * FROM t where f+1 > 1.1"" is not supported
+    test_prune(
+        Scenario::Float64,
+        "SELECT * FROM t where f+1 > 1.1",
+        Some(0),
+        Some(0),
+        9,
+    )
+    .await;
+}
+
+#[tokio::test]
+#[ignore]
+async fn prune_f64_complex_expr_subtract() {
+    // result of sql "SELECT * FROM t where 1-f > 1" is not supported
+    test_prune(
+        Scenario::Float64,
+        "SELECT * FROM t where 1-f > 1",
+        Some(0),
+        Some(0),
+        9,
+    )
+    .await;
+}
+
+#[tokio::test]
+//                      null count  min                                       max
+// page-0                         0  -5                                        -1
+// page-1                         0  -4                                        0
+// page-2                         0  0                                         4
+// page-3                         0  5                                         9
+async fn prune_int32_eq_in_list() {
+    // result of sql "SELECT * FROM t where in (1)"
+    test_prune(
+        Scenario::Int32,
+        "SELECT * FROM t where i in (1)",
+        Some(0),
+        Some(15),
+        1,
+    )
+    .await;
+}
+
+#[tokio::test]
+async fn prune_int32_eq_in_list_negated() {
+    // result of sql "SELECT * FROM t where not in (1)" prune nothing
+    test_prune(
+        Scenario::Int32,
+        "SELECT * FROM t where i not in (1)",
+        Some(0),
+        Some(0),
+        19,
+    )
+    .await;
+}
+
+#[tokio::test]
+async fn prune_decimal_lt() {
+    // The data type of decimal_col is decimal(9,2)
+    // There are three pages each 5 rows:
+    // [1.00, 6.00], [-5.00,6.00], [20.00,60.00]
+    test_prune(
+        Scenario::Decimal,
+        "SELECT * FROM t where decimal_col < 4",
+        Some(0),
+        Some(5),
+        6,
+    )
+    .await;
+    // compare with the casted decimal value
+    test_prune(
+        Scenario::Decimal,
+        "SELECT * FROM t where decimal_col < cast(4.55 as decimal(20,2))",
+        Some(0),
+        Some(5),
+        8,
+    )
+    .await;
+
+    // The data type of decimal_col is decimal(38,2)
+    test_prune(
+        Scenario::DecimalLargePrecision,
+        "SELECT * FROM t where decimal_col < 4",
+        Some(0),
+        Some(5),
+        6,
+    )
+    .await;
+    // compare with the casted decimal value
+    test_prune(
+        Scenario::DecimalLargePrecision,
+        "SELECT * FROM t where decimal_col < cast(4.55 as decimal(20,2))",
+        Some(0),
+        Some(5),
+        8,
+    )
+    .await;
+}
+
+#[tokio::test]
+async fn prune_decimal_eq() {
+    // The data type of decimal_col is decimal(9,2)
+    // There are three pages:
+    // [1.00, 6.00], [-5.00,6.00], [20.00,60.00]
+    test_prune(
+        Scenario::Decimal,
+        "SELECT * FROM t where decimal_col = 4",
+        Some(0),
+        Some(5),
+        2,
+    )
+    .await;
+    test_prune(
+        Scenario::Decimal,
+        "SELECT * FROM t where decimal_col = 4.00",
+        Some(0),
+        Some(5),
+        2,
+    )
+    .await;
+
+    // The data type of decimal_col is decimal(38,2)
+    test_prune(
+        Scenario::DecimalLargePrecision,
+        "SELECT * FROM t where decimal_col = 4",
+        Some(0),
+        Some(5),
+        2,
+    )
+    .await;
+    test_prune(
+        Scenario::DecimalLargePrecision,
+        "SELECT * FROM t where decimal_col = 4.00",
+        Some(0),
+        Some(5),
+        2,
+    )
+    .await;
+    test_prune(
+        Scenario::DecimalLargePrecision,
+        "SELECT * FROM t where decimal_col = 30.00",
+        Some(0),
+        Some(10),
+        2,
+    )
+    .await;
+}
+
+#[tokio::test]
+async fn prune_decimal_in_list() {
+    // The data type of decimal_col is decimal(9,2)
+    // There are three pages:
+    // [1.00, 6.00], [-5.00,6.00], [20.00,60.00]
+    test_prune(
+        Scenario::Decimal,
+        "SELECT * FROM t where decimal_col in (4,3,2,123456789123)",
+        Some(0),
+        Some(5),
+        5,
+    )
+    .await;
+    test_prune(
+        Scenario::Decimal,
+        "SELECT * FROM t where decimal_col in (4.00,3.00,11.2345,1)",
+        Some(0),
+        Some(5),
+        6,
+    )
+    .await;
+
+    // The data type of decimal_col is decimal(38,2)
+    test_prune(
+        Scenario::DecimalLargePrecision,
+        "SELECT * FROM t where decimal_col in (4,3,2,123456789123)",
+        Some(0),
+        Some(5),
+        5,
+    )
+    .await;
+    test_prune(
+        Scenario::DecimalLargePrecision,
+        "SELECT * FROM t where decimal_col in (4.00,3.00,11.2345,1)",
+        Some(0),
+        Some(5),
+        6,
+    )
+    .await;
+}
diff --git a/datafusion/core/tests/parquet/row_group_pruning.rs b/datafusion/core/tests/parquet/row_group_pruning.rs
index c3de01b38..c7e3c533f 100644
--- a/datafusion/core/tests/parquet/row_group_pruning.rs
+++ b/datafusion/core/tests/parquet/row_group_pruning.rs
@@ -18,32 +18,12 @@
 //! This file contains an end to end test of parquet pruning. It writes
 //! data into a parquet file and then verifies row groups are pruned as
 //! expected.
-use std::sync::Arc;
-
-use arrow::array::Decimal128Array;
-use arrow::{
-    array::{
-        Array, ArrayRef, Date32Array, Date64Array, Float64Array, Int32Array, StringArray,
-        TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
-        TimestampSecondArray,
-    },
-    datatypes::{DataType, Field, Schema},
-    record_batch::RecordBatch,
-    util::pretty::pretty_format_batches,
-};
-use chrono::{Datelike, Duration};
-use datafusion::{
-    datasource::{provider_as_source, TableProvider},
-    physical_plan::{
-        accept, file_format::ParquetExec, metrics::MetricsSet, ExecutionPlan,
-        ExecutionPlanVisitor,
-    },
-    prelude::{ParquetReadOptions, SessionConfig, SessionContext},
-    scalar::ScalarValue,
-};
-use datafusion_expr::{col, lit, Expr, LogicalPlan, LogicalPlanBuilder};
-use parquet::{arrow::ArrowWriter, file::properties::WriterProperties};
-use tempfile::NamedTempFile;
+use datafusion::prelude::SessionConfig;
+use datafusion_common::ScalarValue;
+
+use crate::parquet::Unit::RowGroup;
+use crate::parquet::{ContextWithParquet, Scenario};
+use datafusion_expr::{col, lit};
 
 async fn test_prune(
     case_data_type: Scenario,
@@ -52,7 +32,7 @@ async fn test_prune(
     expected_row_group_pruned: Option<usize>,
     expected_results: usize,
 ) {
-    let output = ContextWithParquet::new(case_data_type)
+    let output = ContextWithParquet::new(case_data_type, RowGroup)
         .await
         .query(sql)
         .await;
@@ -137,7 +117,7 @@ async fn prune_date64() {
         .and_time(chrono::NaiveTime::from_hms_opt(0, 0, 0).unwrap());
     let date = ScalarValue::Date64(Some(date.timestamp_millis()));
 
-    let output = ContextWithParquet::new(Scenario::Dates)
+    let output = ContextWithParquet::new(Scenario::Dates, RowGroup)
         .await
         .query_with_expr(col("date64").lt(lit(date)))
         // .query(
@@ -169,7 +149,7 @@ async fn prune_disabled() {
     let expected_rows = 10;
     let config = SessionConfig::new().with_parquet_pruning(false);
 
-    let output = ContextWithParquet::with_config(Scenario::Timestamps, config)
+    let output = ContextWithParquet::with_config(Scenario::Timestamps, RowGroup, config)
         .await
         .query(query)
         .await;
@@ -503,465 +483,3 @@ async fn prune_decimal_in_list() {
     )
     .await;
 }
-
-// ----------------------
-// Begin test fixture
-// ----------------------
-
-/// What data to use
-enum Scenario {
-    Timestamps,
-    Dates,
-    Int32,
-    Float64,
-    Decimal,
-    DecimalLargePrecision,
-}
-
-/// Test fixture that has an execution context that has an external
-/// table "t" registered, pointing at a parquet file made with
-/// `make_test_file`
-struct ContextWithParquet {
-    #[allow(dead_code)]
-    /// temp file parquet data is written to. The file is cleaned up
-    /// when dropped
-    file: NamedTempFile,
-    provider: Arc<dyn TableProvider>,
-    ctx: SessionContext,
-}
-
-/// The output of running one of the test cases
-struct TestOutput {
-    /// The input string
-    sql: String,
-    /// Execution metrics for the Parquet Scan
-    parquet_metrics: MetricsSet,
-    /// number of rows in results
-    result_rows: usize,
-    /// the contents of the input, as a string
-    pretty_input: String,
-    /// the raw results, as a string
-    pretty_results: String,
-}
-
-impl TestOutput {
-    /// retrieve the value of the named metric, if any
-    fn metric_value(&self, metric_name: &str) -> Option<usize> {
-        self.parquet_metrics
-            .sum(|metric| metric.value().name() == metric_name)
-            .map(|v| v.as_usize())
-    }
-
-    /// The number of times the pruning predicate evaluation errors
-    fn predicate_evaluation_errors(&self) -> Option<usize> {
-        self.metric_value("predicate_evaluation_errors")
-    }
-
-    /// The number of times the pruning predicate evaluation errors
-    fn row_groups_pruned(&self) -> Option<usize> {
-        self.metric_value("row_groups_pruned")
-    }
-
-    fn description(&self) -> String {
-        format!(
-            "Input:\n{}\nQuery:\n{}\nOutput:\n{}\nMetrics:\n{}",
-            self.pretty_input, self.sql, self.pretty_results, self.parquet_metrics,
-        )
-    }
-}
-
-/// Creates an execution context that has an external table "t"
-/// registered pointing at a parquet file made with `make_test_file`
-/// and the appropriate scenario
-impl ContextWithParquet {
-    async fn new(scenario: Scenario) -> Self {
-        Self::with_config(scenario, SessionConfig::new()).await
-    }
-
-    async fn with_config(scenario: Scenario, config: SessionConfig) -> Self {
-        let file = make_test_file(scenario).await;
-        let parquet_path = file.path().to_string_lossy();
-
-        // now, setup a the file as a data source and run a query against it
-        let ctx = SessionContext::with_config(config);
-
-        ctx.register_parquet("t", &parquet_path, ParquetReadOptions::default())
-            .await
-            .unwrap();
-        let provider = ctx.deregister_table("t").unwrap().unwrap();
-        ctx.register_table("t", provider.clone()).unwrap();
-
-        Self {
-            file,
-            provider,
-            ctx,
-        }
-    }
-
-    /// runs a query like "SELECT * from t WHERE <expr> and returns
-    /// the number of output rows and normalized execution metrics
-    async fn query_with_expr(&mut self, expr: Expr) -> TestOutput {
-        let sql = format!("EXPR only: {:?}", expr);
-        let logical_plan = LogicalPlanBuilder::scan(
-            "t",
-            provider_as_source(self.provider.clone()),
-            None,
-        )
-        .unwrap()
-        .filter(expr)
-        .unwrap()
-        .build()
-        .unwrap();
-        self.run_test(logical_plan, sql).await
-    }
-
-    /// Runs the specified SQL query and returns the number of output
-    /// rows and normalized execution metrics
-    async fn query(&mut self, sql: &str) -> TestOutput {
-        println!("Planning sql {}", sql);
-        let logical_plan = self
-            .ctx
-            .sql(sql)
-            .await
-            .expect("planning")
-            .to_unoptimized_plan();
-        self.run_test(logical_plan, sql).await
-    }
-
-    /// runs the logical plan
-    async fn run_test(
-        &mut self,
-        logical_plan: LogicalPlan,
-        sql: impl Into<String>,
-    ) -> TestOutput {
-        let input = self
-            .ctx
-            .sql("SELECT * from t")
-            .await
-            .expect("planning")
-            .collect()
-            .await
-            .expect("getting input");
-        let pretty_input = pretty_format_batches(&input).unwrap().to_string();
-
-        let logical_plan = self.ctx.optimize(&logical_plan).expect("optimizing plan");
-
-        let physical_plan = self
-            .ctx
-            .create_physical_plan(&logical_plan)
-            .await
-            .expect("creating physical plan");
-
-        let task_ctx = self.ctx.task_ctx();
-        let results = datafusion::physical_plan::collect(physical_plan.clone(), task_ctx)
-            .await
-            .expect("Running");
-
-        // find the parquet metrics
-        struct MetricsFinder {
-            metrics: Option<MetricsSet>,
-        }
-        impl ExecutionPlanVisitor for MetricsFinder {
-            type Error = std::convert::Infallible;
-            fn pre_visit(
-                &mut self,
-                plan: &dyn ExecutionPlan,
-            ) -> Result<bool, Self::Error> {
-                if plan.as_any().downcast_ref::<ParquetExec>().is_some() {
-                    self.metrics = plan.metrics();
-                }
-                // stop searching once we have found the metrics
-                Ok(self.metrics.is_none())
-            }
-        }
-        let mut finder = MetricsFinder { metrics: None };
-        accept(physical_plan.as_ref(), &mut finder).unwrap();
-        let parquet_metrics = finder.metrics.unwrap();
-
-        let result_rows = results.iter().map(|b| b.num_rows()).sum();
-
-        let pretty_results = pretty_format_batches(&results).unwrap().to_string();
-
-        let sql = sql.into();
-        TestOutput {
-            sql,
-            parquet_metrics,
-            result_rows,
-            pretty_input,
-            pretty_results,
-        }
-    }
-}
-
-/// Create a test parquet file with varioud data types
-async fn make_test_file(scenario: Scenario) -> NamedTempFile {
-    let mut output_file = tempfile::Builder::new()
-        .prefix("parquet_pruning")
-        .suffix(".parquet")
-        .tempfile()
-        .expect("tempfile creation");
-
-    let props = WriterProperties::builder()
-        .set_max_row_group_size(5)
-        .build();
-
-    let batches = match scenario {
-        Scenario::Timestamps => {
-            vec![
-                make_timestamp_batch(Duration::seconds(0)),
-                make_timestamp_batch(Duration::seconds(10)),
-                make_timestamp_batch(Duration::minutes(10)),
-                make_timestamp_batch(Duration::days(10)),
-            ]
-        }
-        Scenario::Dates => {
-            vec![
-                make_date_batch(Duration::days(0)),
-                make_date_batch(Duration::days(10)),
-                make_date_batch(Duration::days(300)),
-                make_date_batch(Duration::days(3600)),
-            ]
-        }
-        Scenario::Int32 => {
-            vec![
-                make_int32_batch(-5, 0),
-                make_int32_batch(-4, 1),
-                make_int32_batch(0, 5),
-                make_int32_batch(5, 10),
-            ]
-        }
-        Scenario::Float64 => {
-            vec![
-                make_f64_batch(vec![-5.0, -4.0, -3.0, -2.0, -1.0]),
-                make_f64_batch(vec![-4.0, -3.0, -2.0, -1.0, 0.0]),
-                make_f64_batch(vec![0.0, 1.0, 2.0, 3.0, 4.0]),
-                make_f64_batch(vec![5.0, 6.0, 7.0, 8.0, 9.0]),
-            ]
-        }
-        Scenario::Decimal => {
-            // decimal record batch
-            vec![
-                make_decimal_batch(vec![100, 200, 300, 400, 600], 9, 2),
-                make_decimal_batch(vec![-500, 100, 300, 400, 600], 9, 2),
-                make_decimal_batch(vec![2000, 3000, 3000, 4000, 6000], 9, 2),
-            ]
-        }
-        Scenario::DecimalLargePrecision => {
-            // decimal record batch with large precision,
-            // and the data will stored as FIXED_LENGTH_BYTE_ARRAY
-            vec![
-                make_decimal_batch(vec![100, 200, 300, 400, 600], 38, 2),
-                make_decimal_batch(vec![-500, 100, 300, 400, 600], 38, 2),
-                make_decimal_batch(vec![2000, 3000, 3000, 4000, 6000], 38, 2),
-            ]
-        }
-    };
-
-    let schema = batches[0].schema();
-
-    let mut writer = ArrowWriter::try_new(&mut output_file, schema, Some(props)).unwrap();
-
-    for batch in batches {
-        writer.write(&batch).expect("writing batch");
-    }
-    writer.close().unwrap();
-
-    output_file
-}
-
-/// Return record batch with a few rows of data for all of the supported timestamp types
-/// values with the specified offset
-///
-/// Columns are named:
-/// "nanos" --> TimestampNanosecondArray
-/// "micros" --> TimestampMicrosecondArray
-/// "millis" --> TimestampMillisecondArray
-/// "seconds" --> TimestampSecondArray
-/// "names" --> StringArray
-fn make_timestamp_batch(offset: Duration) -> RecordBatch {
-    let ts_strings = vec![
-        Some("2020-01-01T01:01:01.0000000000001"),
-        Some("2020-01-01T01:02:01.0000000000001"),
-        Some("2020-01-01T02:01:01.0000000000001"),
-        None,
-        Some("2020-01-02T01:01:01.0000000000001"),
-    ];
-
-    let offset_nanos = offset.num_nanoseconds().expect("non overflow nanos");
-
-    let ts_nanos = ts_strings
-        .into_iter()
-        .map(|t| {
-            t.map(|t| {
-                offset_nanos
-                    + t.parse::<chrono::NaiveDateTime>()
-                        .unwrap()
-                        .timestamp_nanos()
-            })
-        })
-        .collect::<Vec<_>>();
-
-    let ts_micros = ts_nanos
-        .iter()
-        .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000))
-        .collect::<Vec<_>>();
-
-    let ts_millis = ts_nanos
-        .iter()
-        .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000))
-        .collect::<Vec<_>>();
-
-    let ts_seconds = ts_nanos
-        .iter()
-        .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000000))
-        .collect::<Vec<_>>();
-
-    let names = ts_nanos
-        .iter()
-        .enumerate()
-        .map(|(i, _)| format!("Row {} + {}", i, offset))
-        .collect::<Vec<_>>();
-
-    let arr_nanos = TimestampNanosecondArray::from(ts_nanos);
-    let arr_micros = TimestampMicrosecondArray::from(ts_micros);
-    let arr_millis = TimestampMillisecondArray::from(ts_millis);
-    let arr_seconds = TimestampSecondArray::from(ts_seconds);
-
-    let names = names.iter().map(|s| s.as_str()).collect::<Vec<_>>();
-    let arr_names = StringArray::from(names);
-
-    let schema = Schema::new(vec![
-        Field::new("nanos", arr_nanos.data_type().clone(), true),
-        Field::new("micros", arr_micros.data_type().clone(), true),
-        Field::new("millis", arr_millis.data_type().clone(), true),
-        Field::new("seconds", arr_seconds.data_type().clone(), true),
-        Field::new("name", arr_names.data_type().clone(), true),
-    ]);
-    let schema = Arc::new(schema);
-
-    RecordBatch::try_new(
-        schema,
-        vec![
-            Arc::new(arr_nanos),
-            Arc::new(arr_micros),
-            Arc::new(arr_millis),
-            Arc::new(arr_seconds),
-            Arc::new(arr_names),
-        ],
-    )
-    .unwrap()
-}
-
-/// Return record batch with i32 sequence
-///
-/// Columns are named
-/// "i" -> Int32Array
-fn make_int32_batch(start: i32, end: i32) -> RecordBatch {
-    let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
-    let v: Vec<i32> = (start..end).collect();
-    let array = Arc::new(Int32Array::from(v)) as ArrayRef;
-    RecordBatch::try_new(schema, vec![array.clone()]).unwrap()
-}
-
-/// Return record batch with f64 vector
-///
-/// Columns are named
-/// "f" -> Float64Array
-fn make_f64_batch(v: Vec<f64>) -> RecordBatch {
-    let schema = Arc::new(Schema::new(vec![Field::new("f", DataType::Float64, true)]));
-    let array = Arc::new(Float64Array::from(v)) as ArrayRef;
-    RecordBatch::try_new(schema, vec![array.clone()]).unwrap()
-}
-
-/// Return record batch with decimal vector
-///
-/// Columns are named
-/// "decimal_col" -> DecimalArray
-fn make_decimal_batch(v: Vec<i128>, precision: u8, scale: u8) -> RecordBatch {
-    let schema = Arc::new(Schema::new(vec![Field::new(
-        "decimal_col",
-        DataType::Decimal128(precision, scale),
-        true,
-    )]));
-    let array = Arc::new(
-        Decimal128Array::from_iter_values(v)
-            .with_precision_and_scale(precision, scale)
-            .unwrap(),
-    ) as ArrayRef;
-    RecordBatch::try_new(schema, vec![array.clone()]).unwrap()
-}
-
-/// Return record batch with a few rows of data for all of the supported date
-/// types with the specified offset (in days)
-///
-/// Columns are named:
-/// "date32" --> Date32Array
-/// "date64" --> Date64Array
-/// "names" --> StringArray
-fn make_date_batch(offset: Duration) -> RecordBatch {
-    let date_strings = vec![
-        Some("2020-01-01"),
-        Some("2020-01-02"),
-        Some("2020-01-03"),
-        None,
-        Some("2020-01-04"),
-    ];
-
-    let names = date_strings
-        .iter()
-        .enumerate()
-        .map(|(i, val)| format!("Row {} + {}: {:?}", i, offset, val))
-        .collect::<Vec<_>>();
-
-    // Copied from `cast.rs` cast kernel due to lack of temporal kernels
-    // https://github.com/apache/arrow-rs/issues/527
-    const EPOCH_DAYS_FROM_CE: i32 = 719_163;
-
-    let date_seconds = date_strings
-        .iter()
-        .map(|t| {
-            t.map(|t| {
-                let t = t.parse::<chrono::NaiveDate>().unwrap();
-                let t = t + offset;
-                t.num_days_from_ce() - EPOCH_DAYS_FROM_CE
-            })
-        })
-        .collect::<Vec<_>>();
-
-    let date_millis = date_strings
-        .into_iter()
-        .map(|t| {
-            t.map(|t| {
-                let t = t
-                    .parse::<chrono::NaiveDate>()
-                    .unwrap()
-                    .and_time(chrono::NaiveTime::from_hms_opt(0, 0, 0).unwrap());
-                let t = t + offset;
-                t.timestamp_millis()
-            })
-        })
-        .collect::<Vec<_>>();
-
-    let arr_date32 = Date32Array::from(date_seconds);
-    let arr_date64 = Date64Array::from(date_millis);
-
-    let names = names.iter().map(|s| s.as_str()).collect::<Vec<_>>();
-    let arr_names = StringArray::from(names);
-
-    let schema = Schema::new(vec![
-        Field::new("date32", arr_date32.data_type().clone(), true),
-        Field::new("date64", arr_date64.data_type().clone(), true),
-        Field::new("name", arr_names.data_type().clone(), true),
-    ]);
-    let schema = Arc::new(schema);
-
-    RecordBatch::try_new(
-        schema,
-        vec![
-            Arc::new(arr_date32),
-            Arc::new(arr_date64),
-            Arc::new(arr_names),
-        ],
-    )
-    .unwrap()
-}
diff --git a/test-utils/src/data_gen.rs b/test-utils/src/data_gen.rs
index adff4a514..c82d56ef2 100644
--- a/test-utils/src/data_gen.rs
+++ b/test-utils/src/data_gen.rs
@@ -19,8 +19,8 @@ use std::ops::Range;
 use std::sync::Arc;
 
 use arrow::array::{
-    Int32Builder, StringBuilder, StringDictionaryBuilder, TimestampNanosecondBuilder,
-    UInt16Builder,
+    Decimal128Builder, Int32Builder, StringBuilder, StringDictionaryBuilder,
+    TimestampNanosecondBuilder, UInt16Builder,
 };
 use arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef, TimeUnit};
 use arrow::record_batch::RecordBatch;
@@ -43,6 +43,7 @@ struct BatchBuilder {
     request_bytes: Int32Builder,
     response_bytes: Int32Builder,
     response_status: UInt16Builder,
+    prices_status: Decimal128Builder,
 
     /// optional  number of rows produced
     row_limit: Option<usize>,
@@ -73,6 +74,7 @@ impl BatchBuilder {
             Field::new("request_bytes", DataType::Int32, true),
             Field::new("response_bytes", DataType::Int32, true),
             Field::new("response_status", DataType::UInt16, false),
+            Field::new("decimal_price", DataType::Decimal128(38, 0), false),
         ]))
     }
 
@@ -146,6 +148,7 @@ impl BatchBuilder {
             .append_option(rng.gen_bool(0.9).then(|| rng.gen()));
         self.response_status
             .append_value(status[rng.gen_range(0..status.len())]);
+        self.prices_status.append_value(self.row_count as i128);
     }
 
     fn finish(mut self, schema: SchemaRef) -> RecordBatch {
@@ -166,6 +169,12 @@ impl BatchBuilder {
                 Arc::new(self.request_bytes.finish()),
                 Arc::new(self.response_bytes.finish()),
                 Arc::new(self.response_status.finish()),
+                Arc::new(
+                    self.prices_status
+                        .finish()
+                        .with_precision_and_scale(38, 0)
+                        .unwrap(),
+                ),
             ],
         )
         .unwrap()