You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/22 12:07:19 UTC
[arrow-datafusion] branch master updated: Support parquet page filtering on min_max for `decimal128` and `string` columns (#4255)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new d7a7fb61a Support parquet page filtering on min_max for `decimal128` and `string` columns (#4255)
d7a7fb61a is described below
commit d7a7fb61afe9ce2824aae737f65aec12d9513f7f
Author: Yang Jiang <ya...@ebay.com>
AuthorDate: Tue Nov 22 20:07:13 2022 +0800
Support parquet page filtering on min_max for `decimal128` and `string` columns (#4255)
* Support parquet page filtering for string columns
Signed-off-by: yangjiang <ya...@ebay.com>
* Support parquet page filtering on min_max for decimal128 columns
Signed-off-by: yangjiang <ya...@ebay.com>
* Update datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
* Avoid unwarp
Signed-off-by: yangjiang <ya...@ebay.com>
* reorg test code
Signed-off-by: yangjiang <ya...@ebay.com>
* add test for page index
Signed-off-by: yangjiang <ya...@ebay.com>
* fix commet
Signed-off-by: yangjiang <ya...@ebay.com>
* remove code
Signed-off-by: yangjiang <ya...@ebay.com>
Signed-off-by: yangjiang <ya...@ebay.com>
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
.../core/src/physical_plan/file_format/parquet.rs | 41 +-
.../file_format/parquet/page_filter.rs | 90 +++-
.../file_format/parquet/row_groups.rs | 44 +-
datafusion/core/tests/parquet/filter_pushdown.rs | 53 +-
datafusion/core/tests/parquet/mod.rs | 538 +++++++++++++++++++++
datafusion/core/tests/parquet/page_pruning.rs | 512 +++++++++++++++++++-
datafusion/core/tests/parquet/row_group_pruning.rs | 500 +------------------
test-utils/src/data_gen.rs | 13 +-
8 files changed, 1233 insertions(+), 558 deletions(-)
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 2ba8203a5..fa68f3072 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -17,7 +17,7 @@
//! Execution plan for reading Parquet files
-use arrow::datatypes::SchemaRef;
+use arrow::datatypes::{DataType, SchemaRef};
use fmt::Debug;
use std::any::Any;
use std::fmt;
@@ -55,8 +55,10 @@ use object_store::{ObjectMeta, ObjectStore};
use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask};
+use parquet::basic::{ConvertedType, LogicalType};
use parquet::errors::ParquetError;
use parquet::file::{metadata::ParquetMetaData, properties::WriterProperties};
+use parquet::schema::types::ColumnDescriptor;
mod metrics;
mod page_filter;
@@ -674,6 +676,43 @@ pub async fn plan_to_parquet(
}
}
+// TODO: consolidate code with arrow-rs
+// Convert the bytes array to i128.
+// The endian of the input bytes array must be big-endian.
+// Copy from the arrow-rs
+pub(crate) fn from_bytes_to_i128(b: &[u8]) -> i128 {
+ assert!(b.len() <= 16, "Decimal128Array supports only up to size 16");
+ let first_bit = b[0] & 128u8 == 128u8;
+ let mut result = if first_bit { [255u8; 16] } else { [0u8; 16] };
+ for (i, v) in b.iter().enumerate() {
+ result[i + (16 - b.len())] = *v;
+ }
+ // The bytes array are from parquet file and must be the big-endian.
+ // The endian is defined by parquet format, and the reference document
+ // https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66
+ i128::from_be_bytes(result)
+}
+
+// Convert parquet column schema to arrow data type, and just consider the
+// decimal data type.
+pub(crate) fn parquet_to_arrow_decimal_type(
+ parquet_column: &ColumnDescriptor,
+) -> Option<DataType> {
+ let type_ptr = parquet_column.self_type_ptr();
+ match type_ptr.get_basic_info().logical_type() {
+ Some(LogicalType::Decimal { scale, precision }) => {
+ Some(DataType::Decimal128(precision as u8, scale as u8))
+ }
+ _ => match type_ptr.get_basic_info().converted_type() {
+ ConvertedType::DECIMAL => Some(DataType::Decimal128(
+ type_ptr.get_precision() as u8,
+ type_ptr.get_scale() as u8,
+ )),
+ _ => None,
+ },
+ }
+}
+
#[cfg(test)]
mod tests {
// See also `parquet_exec` integration test
diff --git a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
index 5f31a6a49..a1cf03666 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
@@ -17,11 +17,16 @@
//! Contains code to filter entire pages
-use arrow::array::{BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array};
+use arrow::array::{
+ BooleanArray, Decimal128Array, Float32Array, Float64Array, Int32Array, Int64Array,
+ StringArray,
+};
+use arrow::datatypes::DataType;
use arrow::{array::ArrayRef, datatypes::SchemaRef, error::ArrowError};
use datafusion_common::{Column, DataFusionError, Result};
use datafusion_optimizer::utils::split_conjunction;
use log::{debug, error, trace};
+use parquet::schema::types::ColumnDescriptor;
use parquet::{
arrow::arrow_reader::{RowSelection, RowSelector},
errors::ParquetError,
@@ -35,6 +40,9 @@ use std::collections::VecDeque;
use std::sync::Arc;
use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
+use crate::physical_plan::file_format::parquet::{
+ from_bytes_to_i128, parquet_to_arrow_decimal_type,
+};
use super::metrics::ParquetFileMetrics;
@@ -132,6 +140,7 @@ pub(crate) fn build_page_filter(
&predicate,
rg_offset_indexes.get(col_id),
rg_page_indexes.get(col_id),
+ groups[*r].column(col_id).column_descr(),
file_metrics,
)
.map_err(|e| {
@@ -305,15 +314,18 @@ fn prune_pages_in_one_row_group(
predicate: &PruningPredicate,
col_offset_indexes: Option<&Vec<PageLocation>>,
col_page_indexes: Option<&Index>,
+ col_desc: &ColumnDescriptor,
metrics: &ParquetFileMetrics,
) -> Result<Vec<RowSelector>> {
let num_rows = group.num_rows() as usize;
if let (Some(col_offset_indexes), Some(col_page_indexes)) =
(col_offset_indexes, col_page_indexes)
{
+ let target_type = parquet_to_arrow_decimal_type(col_desc);
let pruning_stats = PagesPruningStatistics {
col_page_indexes,
col_offset_indexes,
+ target_type: &target_type,
};
match predicate.prune(&pruning_stats) {
@@ -382,6 +394,9 @@ fn create_row_count_in_each_page(
struct PagesPruningStatistics<'a> {
col_page_indexes: &'a Index,
col_offset_indexes: &'a Vec<PageLocation>,
+ // target_type means the logical type in schema: like 'DECIMAL' is the logical type, but the
+ // real physical type in parquet file may be `INT32, INT64, FIXED_LEN_BYTE_ARRAY`
+ target_type: &'a Option<DataType>,
}
// Extract the min or max value calling `func` from page idex
@@ -390,16 +405,48 @@ macro_rules! get_min_max_values_for_page_index {
match $self.col_page_indexes {
Index::NONE => None,
Index::INT32(index) => {
- let vec = &index.indexes;
- Some(Arc::new(Int32Array::from_iter(
- vec.iter().map(|x| x.$func().cloned()),
- )))
+ match $self.target_type {
+ // int32 to decimal with the precision and scale
+ Some(DataType::Decimal128(precision, scale)) => {
+ let vec = &index.indexes;
+ let vec: Vec<Option<i128>> = vec
+ .iter()
+ .map(|x| x.$func().and_then(|x| Some(*x as i128)))
+ .collect();
+ Decimal128Array::from(vec)
+ .with_precision_and_scale(*precision, *scale)
+ .ok()
+ .map(|arr| Arc::new(arr) as ArrayRef)
+ }
+ _ => {
+ let vec = &index.indexes;
+ Some(Arc::new(Int32Array::from_iter(
+ vec.iter().map(|x| x.$func().cloned()),
+ )))
+ }
+ }
}
Index::INT64(index) => {
- let vec = &index.indexes;
- Some(Arc::new(Int64Array::from_iter(
- vec.iter().map(|x| x.$func().cloned()),
- )))
+ match $self.target_type {
+ // int64 to decimal with the precision and scale
+ Some(DataType::Decimal128(precision, scale)) => {
+ let vec = &index.indexes;
+ let vec: Vec<Option<i128>> = vec
+ .iter()
+ .map(|x| x.$func().and_then(|x| Some(*x as i128)))
+ .collect();
+ Decimal128Array::from(vec)
+ .with_precision_and_scale(*precision, *scale)
+ .ok()
+ .map(|arr| Arc::new(arr) as ArrayRef)
+ }
+ _ => {
+ let vec = &index.indexes;
+ Some(Arc::new(Int64Array::from_iter(
+ vec.iter().map(|x| x.$func().cloned()),
+ )))
+ }
+ }
}
Index::FLOAT(index) => {
let vec = &index.indexes;
@@ -419,10 +466,33 @@ macro_rules! get_min_max_values_for_page_index {
vec.iter().map(|x| x.$func().cloned()),
)))
}
- Index::INT96(_) | Index::BYTE_ARRAY(_) | Index::FIXED_LEN_BYTE_ARRAY(_) => {
+ Index::BYTE_ARRAY(index) => {
+ let vec = &index.indexes;
+ let array: StringArray = vec
+ .iter()
+ .map(|x| x.$func())
+ .map(|x| x.and_then(|x| std::str::from_utf8(x).ok()))
+ .collect();
+ Some(Arc::new(array))
+ }
+ Index::INT96(_) => {
//Todo support these type
None
}
+ Index::FIXED_LEN_BYTE_ARRAY(index) => match $self.target_type {
+ Some(DataType::Decimal128(precision, scale)) => {
+ let vec = &index.indexes;
+ Decimal128Array::from(
+ vec.iter()
+ .map(|x| x.$func().and_then(|x| Some(from_bytes_to_i128(x))))
+ .collect::<Vec<Option<i128>>>(),
+ )
+ .with_precision_and_scale(*precision, *scale)
+ .ok()
+ .map(|arr| Arc::new(arr) as ArrayRef)
+ }
+ _ => None,
+ },
}
}};
}
diff --git a/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs b/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs
index dab2a4226..0f0b37807 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs
@@ -23,16 +23,17 @@ use datafusion_common::Column;
use datafusion_common::ScalarValue;
use log::debug;
-use parquet::{
- file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics},
- schema::types::ColumnDescriptor,
+use parquet::file::{
+ metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics,
};
+use crate::physical_plan::file_format::parquet::{
+ from_bytes_to_i128, parquet_to_arrow_decimal_type,
+};
use crate::{
datasource::listing::FileRange,
physical_optimizer::pruning::{PruningPredicate, PruningStatistics},
};
-use parquet::basic::{ConvertedType, LogicalType};
use super::ParquetFileMetrics;
@@ -85,23 +86,6 @@ struct RowGroupPruningStatistics<'a> {
parquet_schema: &'a Schema,
}
-// TODO: consolidate code with arrow-rs
-// Convert the bytes array to i128.
-// The endian of the input bytes array must be big-endian.
-// Copy from the arrow-rs
-fn from_bytes_to_i128(b: &[u8]) -> i128 {
- assert!(b.len() <= 16, "Decimal128Array supports only up to size 16");
- let first_bit = b[0] & 128u8 == 128u8;
- let mut result = if first_bit { [255u8; 16] } else { [0u8; 16] };
- for (i, v) in b.iter().enumerate() {
- result[i + (16 - b.len())] = *v;
- }
- // The bytes array are from parquet file and must be the big-endian.
- // The endian is defined by parquet format, and the reference document
- // https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66
- i128::from_be_bytes(result)
-}
-
/// Extract the min/max statistics from a `ParquetStatistics` object
macro_rules! get_statistic {
($column_statistics:expr, $func:ident, $bytes_func:ident, $target_arrow_type:expr) => {{
@@ -217,24 +201,6 @@ macro_rules! get_null_count_values {
}};
}
-// Convert parquet column schema to arrow data type, and just consider the
-// decimal data type.
-fn parquet_to_arrow_decimal_type(parquet_column: &ColumnDescriptor) -> Option<DataType> {
- let type_ptr = parquet_column.self_type_ptr();
- match type_ptr.get_basic_info().logical_type() {
- Some(LogicalType::Decimal { scale, precision }) => {
- Some(DataType::Decimal128(precision as u8, scale as u8))
- }
- _ => match type_ptr.get_basic_info().converted_type() {
- ConvertedType::DECIMAL => Some(DataType::Decimal128(
- type_ptr.get_precision() as u8,
- type_ptr.get_scale() as u8,
- )),
- _ => None,
- },
- }
-}
-
impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
get_min_max_values!(self, column, min, min_bytes)
diff --git a/datafusion/core/tests/parquet/filter_pushdown.rs b/datafusion/core/tests/parquet/filter_pushdown.rs
index 657f00d0c..999becafd 100644
--- a/datafusion/core/tests/parquet/filter_pushdown.rs
+++ b/datafusion/core/tests/parquet/filter_pushdown.rs
@@ -266,20 +266,17 @@ async fn single_file_small_data_pages() {
// page 3: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: djzdyiecnumrsrcbizwlqzdhnpoiqdh, max: fktdcgtmzvoedpwhfevcvvrtaurzgex, num_nulls not defined] CRC:[none] SZ:7 VC:9216
// page 4: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: fktdcgtmzvoedpwhfevcvvrtaurzgex, max: fwtdpgtxwqkkgtgvthhwycrvjiizdifyp, num_nulls not defined] CRC:[none] SZ:7 VC:9216
// page 5: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: fwtdpgtxwqkkgtgvthhwycrvjiizdifyp, max: iadnalqpdzthpifrvewossmpqibgtsuin, num_nulls not defined] CRC:[none] SZ:7 VC:7739
- //
- // This test currently fails due to https://github.com/apache/arrow-datafusion/issues/3833
- // (page index pruning not implemented for byte array)
-
- // TestCase::new(&test_parquet_file)
- // .with_name("selective")
- // // predicate is chosen carefully to prune pages 0, 1, 2, 3, 4
- // // pod = 'iadnalqpdzthpifrvewossmpqibgtsuin'
- // .with_filter(col("pod").eq(lit("iadnalqpdzthpifrvewossmpqibgtsuin")))
- // .with_pushdown_expected(PushdownExpected::Some)
- // .with_page_index_filtering_expected(PageIndexFilteringExpected::Some)
- // .with_expected_rows(2574)
- // .run()
- // .await;
+
+ TestCase::new(&test_parquet_file)
+ .with_name("selective")
+ // predicate is chosen carefully to prune pages 0, 1, 2, 3, 4
+ // pod = 'iadnalqpdzthpifrvewossmpqibgtsuin'
+ .with_filter(col("pod").eq(lit("iadnalqpdzthpifrvewossmpqibgtsuin")))
+ .with_pushdown_expected(PushdownExpected::Some)
+ .with_page_index_filtering_expected(PageIndexFilteringExpected::Some)
+ .with_expected_rows(2574)
+ .run()
+ .await;
// time TV=53819 RL=0 DL=0 DS: 7092 DE:PLAIN
// --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
@@ -299,6 +296,34 @@ async fn single_file_small_data_pages() {
.with_expected_rows(9745)
.run()
.await;
+
+ // decimal_price TV=53819 RL=0 DL=0
+ // ----------------------------------------------------------------------------
+ // row group 0:
+ // column index for column decimal_price:
+ // Boudary order: UNORDERED
+ // null count min max
+ // page-0 0 1 9216
+ // page-1 0 9217 18432
+ // page-2 0 18433 27648
+ // page-3 0 27649 36864
+ // page-4 0 36865 46080
+ // page-5 0 46081 53819
+ //
+ // offset index for column decimal_price:
+ // offset compressed size first row index
+ // page-0 5581636 147517 0
+ // page-1 5729153 147517 9216
+ TestCase::new(&test_parquet_file)
+ .with_name("selective_on_decimal")
+ // predicate is chosen carefully to prune pages 1, 2, 3, 4, and 5
+ // decimal_price < 9200
+ .with_filter(col("decimal_price").lt_eq(lit(9200)))
+ .with_pushdown_expected(PushdownExpected::Some)
+ .with_page_index_filtering_expected(PageIndexFilteringExpected::Some)
+ .with_expected_rows(9200)
+ .run()
+ .await;
}
/// Expected pushdown behavior
diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs
index ab410bd76..255df515c 100644
--- a/datafusion/core/tests/parquet/mod.rs
+++ b/datafusion/core/tests/parquet/mod.rs
@@ -16,7 +16,545 @@
// under the License.
//! Parquet integration tests
+use arrow::array::Decimal128Array;
+use arrow::{
+ array::{
+ Array, ArrayRef, Date32Array, Date64Array, Float64Array, Int32Array, StringArray,
+ TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
+ TimestampSecondArray,
+ },
+ datatypes::{DataType, Field, Schema},
+ record_batch::RecordBatch,
+ util::pretty::pretty_format_batches,
+};
+use chrono::{Datelike, Duration};
+use datafusion::config::OPT_PARQUET_ENABLE_PAGE_INDEX;
+use datafusion::{
+ datasource::{provider_as_source, TableProvider},
+ physical_plan::{
+ accept, file_format::ParquetExec, metrics::MetricsSet, ExecutionPlan,
+ ExecutionPlanVisitor,
+ },
+ prelude::{ParquetReadOptions, SessionConfig, SessionContext},
+};
+use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder};
+use parquet::arrow::ArrowWriter;
+use parquet::file::properties::WriterProperties;
+use std::sync::Arc;
+use tempfile::NamedTempFile;
+
mod custom_reader;
mod filter_pushdown;
mod page_pruning;
mod row_group_pruning;
+
+// ----------------------
+// Begin test fixture
+// ----------------------
+
+/// What data to use
+enum Scenario {
+ Timestamps,
+ Dates,
+ Int32,
+ Float64,
+ Decimal,
+ DecimalLargePrecision,
+}
+
+enum Unit {
+ RowGroup,
+ Page,
+}
+
+/// Test fixture that has an execution context that has an external
+/// table "t" registered, pointing at a parquet file made with
+/// `make_test_file`
+struct ContextWithParquet {
+ #[allow(dead_code)]
+ /// temp file parquet data is written to. The file is cleaned up
+ /// when dropped
+ file: NamedTempFile,
+ provider: Arc<dyn TableProvider>,
+ ctx: SessionContext,
+}
+
+/// The output of running one of the test cases
+struct TestOutput {
+ /// The input string
+ sql: String,
+ /// Execution metrics for the Parquet Scan
+ parquet_metrics: MetricsSet,
+ /// number of rows in results
+ result_rows: usize,
+ /// the contents of the input, as a string
+ pretty_input: String,
+ /// the raw results, as a string
+ pretty_results: String,
+}
+
+impl TestOutput {
+ /// retrieve the value of the named metric, if any
+ fn metric_value(&self, metric_name: &str) -> Option<usize> {
+ self.parquet_metrics
+ .sum(|metric| metric.value().name() == metric_name)
+ .map(|v| v.as_usize())
+ }
+
+ /// The number of times the pruning predicate evaluation errors
+ fn predicate_evaluation_errors(&self) -> Option<usize> {
+ self.metric_value("predicate_evaluation_errors")
+ }
+
+ /// The number of times the pruning predicate evaluation errors
+ fn row_groups_pruned(&self) -> Option<usize> {
+ self.metric_value("row_groups_pruned")
+ }
+
+ /// The number of times the pruning predicate evaluation errors
+ fn row_pages_pruned(&self) -> Option<usize> {
+ self.metric_value("page_index_rows_filtered")
+ }
+
+ fn description(&self) -> String {
+ format!(
+ "Input:\n{}\nQuery:\n{}\nOutput:\n{}\nMetrics:\n{}",
+ self.pretty_input, self.sql, self.pretty_results, self.parquet_metrics,
+ )
+ }
+}
+
+/// Creates an execution context that has an external table "t"
+/// registered pointing at a parquet file made with `make_test_file`
+/// and the appropriate scenario
+impl ContextWithParquet {
+ async fn new(scenario: Scenario, unit: Unit) -> Self {
+ Self::with_config(scenario, unit, SessionConfig::new()).await
+ }
+
+ async fn with_config(scenario: Scenario, unit: Unit, config: SessionConfig) -> Self {
+ let file = match unit {
+ Unit::RowGroup => make_test_file_rg(scenario).await,
+ Unit::Page => {
+ config
+ .config_options
+ .write()
+ .set_bool(OPT_PARQUET_ENABLE_PAGE_INDEX, true);
+ make_test_file_page(scenario).await
+ }
+ };
+ let parquet_path = file.path().to_string_lossy();
+
+ // now, setup a the file as a data source and run a query against it
+ let ctx = SessionContext::with_config(config);
+
+ ctx.register_parquet("t", &parquet_path, ParquetReadOptions::default())
+ .await
+ .unwrap();
+ let provider = ctx.deregister_table("t").unwrap().unwrap();
+ ctx.register_table("t", provider.clone()).unwrap();
+
+ Self {
+ file,
+ provider,
+ ctx,
+ }
+ }
+
+ /// runs a query like "SELECT * from t WHERE <expr> and returns
+ /// the number of output rows and normalized execution metrics
+ async fn query_with_expr(&mut self, expr: Expr) -> TestOutput {
+ let sql = format!("EXPR only: {:?}", expr);
+ let logical_plan = LogicalPlanBuilder::scan(
+ "t",
+ provider_as_source(self.provider.clone()),
+ None,
+ )
+ .unwrap()
+ .filter(expr)
+ .unwrap()
+ .build()
+ .unwrap();
+ self.run_test(logical_plan, sql).await
+ }
+
+ /// Runs the specified SQL query and returns the number of output
+ /// rows and normalized execution metrics
+ async fn query(&mut self, sql: &str) -> TestOutput {
+ println!("Planning sql {}", sql);
+ let logical_plan = self
+ .ctx
+ .sql(sql)
+ .await
+ .expect("planning")
+ .to_unoptimized_plan();
+ self.run_test(logical_plan, sql).await
+ }
+
+ /// runs the logical plan
+ async fn run_test(
+ &mut self,
+ logical_plan: LogicalPlan,
+ sql: impl Into<String>,
+ ) -> TestOutput {
+ let input = self
+ .ctx
+ .sql("SELECT * from t")
+ .await
+ .expect("planning")
+ .collect()
+ .await
+ .expect("getting input");
+ let pretty_input = pretty_format_batches(&input).unwrap().to_string();
+
+ let logical_plan = self.ctx.optimize(&logical_plan).expect("optimizing plan");
+
+ let physical_plan = self
+ .ctx
+ .create_physical_plan(&logical_plan)
+ .await
+ .expect("creating physical plan");
+
+ let task_ctx = self.ctx.task_ctx();
+ let results = datafusion::physical_plan::collect(physical_plan.clone(), task_ctx)
+ .await
+ .expect("Running");
+
+ // find the parquet metrics
+ struct MetricsFinder {
+ metrics: Option<MetricsSet>,
+ }
+ impl ExecutionPlanVisitor for MetricsFinder {
+ type Error = std::convert::Infallible;
+ fn pre_visit(
+ &mut self,
+ plan: &dyn ExecutionPlan,
+ ) -> Result<bool, Self::Error> {
+ if plan.as_any().downcast_ref::<ParquetExec>().is_some() {
+ self.metrics = plan.metrics();
+ }
+ // stop searching once we have found the metrics
+ Ok(self.metrics.is_none())
+ }
+ }
+ let mut finder = MetricsFinder { metrics: None };
+ accept(physical_plan.as_ref(), &mut finder).unwrap();
+ let parquet_metrics = finder.metrics.unwrap();
+
+ let result_rows = results.iter().map(|b| b.num_rows()).sum();
+
+ let pretty_results = pretty_format_batches(&results).unwrap().to_string();
+
+ let sql = sql.into();
+ TestOutput {
+ sql,
+ parquet_metrics,
+ result_rows,
+ pretty_input,
+ pretty_results,
+ }
+ }
+}
+
+/// Return record batch with a few rows of data for all of the supported timestamp types
+/// values with the specified offset
+///
+/// Columns are named:
+/// "nanos" --> TimestampNanosecondArray
+/// "micros" --> TimestampMicrosecondArray
+/// "millis" --> TimestampMillisecondArray
+/// "seconds" --> TimestampSecondArray
+/// "names" --> StringArray
+fn make_timestamp_batch(offset: Duration) -> RecordBatch {
+ let ts_strings = vec![
+ Some("2020-01-01T01:01:01.0000000000001"),
+ Some("2020-01-01T01:02:01.0000000000001"),
+ Some("2020-01-01T02:01:01.0000000000001"),
+ None,
+ Some("2020-01-02T01:01:01.0000000000001"),
+ ];
+
+ let offset_nanos = offset.num_nanoseconds().expect("non overflow nanos");
+
+ let ts_nanos = ts_strings
+ .into_iter()
+ .map(|t| {
+ t.map(|t| {
+ offset_nanos
+ + t.parse::<chrono::NaiveDateTime>()
+ .unwrap()
+ .timestamp_nanos()
+ })
+ })
+ .collect::<Vec<_>>();
+
+ let ts_micros = ts_nanos
+ .iter()
+ .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000))
+ .collect::<Vec<_>>();
+
+ let ts_millis = ts_nanos
+ .iter()
+ .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000))
+ .collect::<Vec<_>>();
+
+ let ts_seconds = ts_nanos
+ .iter()
+ .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000000))
+ .collect::<Vec<_>>();
+
+ let names = ts_nanos
+ .iter()
+ .enumerate()
+ .map(|(i, _)| format!("Row {} + {}", i, offset))
+ .collect::<Vec<_>>();
+
+ let arr_nanos = TimestampNanosecondArray::from(ts_nanos);
+ let arr_micros = TimestampMicrosecondArray::from(ts_micros);
+ let arr_millis = TimestampMillisecondArray::from(ts_millis);
+ let arr_seconds = TimestampSecondArray::from(ts_seconds);
+
+ let names = names.iter().map(|s| s.as_str()).collect::<Vec<_>>();
+ let arr_names = StringArray::from(names);
+
+ let schema = Schema::new(vec![
+ Field::new("nanos", arr_nanos.data_type().clone(), true),
+ Field::new("micros", arr_micros.data_type().clone(), true),
+ Field::new("millis", arr_millis.data_type().clone(), true),
+ Field::new("seconds", arr_seconds.data_type().clone(), true),
+ Field::new("name", arr_names.data_type().clone(), true),
+ ]);
+ let schema = Arc::new(schema);
+
+ RecordBatch::try_new(
+ schema,
+ vec![
+ Arc::new(arr_nanos),
+ Arc::new(arr_micros),
+ Arc::new(arr_millis),
+ Arc::new(arr_seconds),
+ Arc::new(arr_names),
+ ],
+ )
+ .unwrap()
+}
+
+/// Return record batch with i32 sequence
+///
+/// Columns are named
+/// "i" -> Int32Array
+fn make_int32_batch(start: i32, end: i32) -> RecordBatch {
+ let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
+ let v: Vec<i32> = (start..end).collect();
+ let array = Arc::new(Int32Array::from(v)) as ArrayRef;
+ RecordBatch::try_new(schema, vec![array.clone()]).unwrap()
+}
+
+/// Return record batch with f64 vector
+///
+/// Columns are named
+/// "f" -> Float64Array
+fn make_f64_batch(v: Vec<f64>) -> RecordBatch {
+ let schema = Arc::new(Schema::new(vec![Field::new("f", DataType::Float64, true)]));
+ let array = Arc::new(Float64Array::from(v)) as ArrayRef;
+ RecordBatch::try_new(schema, vec![array.clone()]).unwrap()
+}
+
+/// Return record batch with decimal vector
+///
+/// Columns are named
+/// "decimal_col" -> DecimalArray
+fn make_decimal_batch(v: Vec<i128>, precision: u8, scale: u8) -> RecordBatch {
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "decimal_col",
+ DataType::Decimal128(precision, scale),
+ true,
+ )]));
+ let array = Arc::new(
+ Decimal128Array::from_iter_values(v)
+ .with_precision_and_scale(precision, scale)
+ .unwrap(),
+ ) as ArrayRef;
+ RecordBatch::try_new(schema, vec![array.clone()]).unwrap()
+}
+
+/// Return record batch with a few rows of data for all of the supported date
+/// types with the specified offset (in days)
+///
+/// Columns are named:
+/// "date32" --> Date32Array
+/// "date64" --> Date64Array
+/// "names" --> StringArray
+fn make_date_batch(offset: Duration) -> RecordBatch {
+ let date_strings = vec![
+ Some("2020-01-01"),
+ Some("2020-01-02"),
+ Some("2020-01-03"),
+ None,
+ Some("2020-01-04"),
+ ];
+
+ let names = date_strings
+ .iter()
+ .enumerate()
+ .map(|(i, val)| format!("Row {} + {}: {:?}", i, offset, val))
+ .collect::<Vec<_>>();
+
+ // Copied from `cast.rs` cast kernel due to lack of temporal kernels
+ // https://github.com/apache/arrow-rs/issues/527
+ const EPOCH_DAYS_FROM_CE: i32 = 719_163;
+
+ let date_seconds = date_strings
+ .iter()
+ .map(|t| {
+ t.map(|t| {
+ let t = t.parse::<chrono::NaiveDate>().unwrap();
+ let t = t + offset;
+ t.num_days_from_ce() - EPOCH_DAYS_FROM_CE
+ })
+ })
+ .collect::<Vec<_>>();
+
+ let date_millis = date_strings
+ .into_iter()
+ .map(|t| {
+ t.map(|t| {
+ let t = t
+ .parse::<chrono::NaiveDate>()
+ .unwrap()
+ .and_time(chrono::NaiveTime::from_hms_opt(0, 0, 0).unwrap());
+ let t = t + offset;
+ t.timestamp_millis()
+ })
+ })
+ .collect::<Vec<_>>();
+
+ let arr_date32 = Date32Array::from(date_seconds);
+ let arr_date64 = Date64Array::from(date_millis);
+
+ let names = names.iter().map(|s| s.as_str()).collect::<Vec<_>>();
+ let arr_names = StringArray::from(names);
+
+ let schema = Schema::new(vec![
+ Field::new("date32", arr_date32.data_type().clone(), true),
+ Field::new("date64", arr_date64.data_type().clone(), true),
+ Field::new("name", arr_names.data_type().clone(), true),
+ ]);
+ let schema = Arc::new(schema);
+
+ RecordBatch::try_new(
+ schema,
+ vec![
+ Arc::new(arr_date32),
+ Arc::new(arr_date64),
+ Arc::new(arr_names),
+ ],
+ )
+ .unwrap()
+}
+
+fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
+ match scenario {
+ Scenario::Timestamps => {
+ vec![
+ make_timestamp_batch(Duration::seconds(0)),
+ make_timestamp_batch(Duration::seconds(10)),
+ make_timestamp_batch(Duration::minutes(10)),
+ make_timestamp_batch(Duration::days(10)),
+ ]
+ }
+ Scenario::Dates => {
+ vec![
+ make_date_batch(Duration::days(0)),
+ make_date_batch(Duration::days(10)),
+ make_date_batch(Duration::days(300)),
+ make_date_batch(Duration::days(3600)),
+ ]
+ }
+ Scenario::Int32 => {
+ vec![
+ make_int32_batch(-5, 0),
+ make_int32_batch(-4, 1),
+ make_int32_batch(0, 5),
+ make_int32_batch(5, 10),
+ ]
+ }
+ Scenario::Float64 => {
+ vec![
+ make_f64_batch(vec![-5.0, -4.0, -3.0, -2.0, -1.0]),
+ make_f64_batch(vec![-4.0, -3.0, -2.0, -1.0, 0.0]),
+ make_f64_batch(vec![0.0, 1.0, 2.0, 3.0, 4.0]),
+ make_f64_batch(vec![5.0, 6.0, 7.0, 8.0, 9.0]),
+ ]
+ }
+ Scenario::Decimal => {
+ // decimal record batch
+ vec![
+ make_decimal_batch(vec![100, 200, 300, 400, 600], 9, 2),
+ make_decimal_batch(vec![-500, 100, 300, 400, 600], 9, 2),
+ make_decimal_batch(vec![2000, 3000, 3000, 4000, 6000], 9, 2),
+ ]
+ }
+ Scenario::DecimalLargePrecision => {
+ // decimal record batch with large precision,
+ // and the data will stored as FIXED_LENGTH_BYTE_ARRAY
+ vec![
+ make_decimal_batch(vec![100, 200, 300, 400, 600], 38, 2),
+ make_decimal_batch(vec![-500, 100, 300, 400, 600], 38, 2),
+ make_decimal_batch(vec![2000, 3000, 3000, 4000, 6000], 38, 2),
+ ]
+ }
+ }
+}
+
+/// Create a test parquet file with varioud data types
+async fn make_test_file_rg(scenario: Scenario) -> NamedTempFile {
+ let mut output_file = tempfile::Builder::new()
+ .prefix("parquet_pruning")
+ .suffix(".parquet")
+ .tempfile()
+ .expect("tempfile creation");
+
+ let props = WriterProperties::builder()
+ .set_max_row_group_size(5)
+ .build();
+
+ let batches = create_data_batch(scenario);
+
+ let schema = batches[0].schema();
+
+ let mut writer = ArrowWriter::try_new(&mut output_file, schema, Some(props)).unwrap();
+
+ for batch in batches {
+ writer.write(&batch).expect("writing batch");
+ }
+ writer.close().unwrap();
+
+ output_file
+}
+
+async fn make_test_file_page(scenario: Scenario) -> NamedTempFile {
+ let mut output_file = tempfile::Builder::new()
+ .prefix("parquet_page_pruning")
+ .suffix(".parquet")
+ .tempfile()
+ .expect("tempfile creation");
+
+ // set row count to 5, should get same result as rowGroup
+ let props = WriterProperties::builder()
+ .set_data_page_row_count_limit(5)
+ .set_write_batch_size(5)
+ .build();
+
+ let batches = create_data_batch(scenario);
+
+ let schema = batches[0].schema();
+
+ let mut writer = ArrowWriter::try_new(&mut output_file, schema, Some(props)).unwrap();
+
+ for batch in batches {
+ writer.write(&batch).expect("writing batch");
+ }
+ writer.close().unwrap();
+ output_file
+}
diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs
index 8d8c3bcae..8c67bc346 100644
--- a/datafusion/core/tests/parquet/page_pruning.rs
+++ b/datafusion/core/tests/parquet/page_pruning.rs
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+use crate::parquet::Unit::Page;
+use crate::parquet::{ContextWithParquet, Scenario};
use datafusion::config::ConfigOptions;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
@@ -23,7 +25,7 @@ use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
-use datafusion_common::Statistics;
+use datafusion_common::{ScalarValue, Statistics};
use datafusion_expr::{col, lit, Expr};
use object_store::path::Path;
use object_store::ObjectMeta;
@@ -142,6 +144,22 @@ async fn page_index_filter_one_col() {
// should same with `month = 1`
assert_eq!(batch.num_rows(), 645);
+
+ let session_ctx = SessionContext::new();
+ let task_ctx = session_ctx.task_ctx();
+
+ // 5.create filter date_string_col == 1;
+ let filter = col("date_string_col").eq(lit("01/01/09"));
+ let parquet_exec = get_parquet_exec(filter, session_ctx.clone()).await;
+ let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap();
+ let batch = results.next().await.unwrap().unwrap();
+
+ // there should only two pages match the filter
+ // min max
+ // page-20 0 01/01/09 01/02/09
+ // page-21 0 01/01/09 01/01/09
+ // each 7 rows
+ assert_eq!(batch.num_rows(), 14);
}
#[tokio::test]
@@ -204,3 +222,495 @@ async fn page_index_filter_multi_col() {
let batch = results.next().await.unwrap().unwrap();
assert_eq!(batch.num_rows(), 7300);
}
+
+async fn test_prune(
+ case_data_type: Scenario,
+ sql: &str,
+ expected_errors: Option<usize>,
+ expected_row_pages_pruned: Option<usize>,
+ expected_results: usize,
+) {
+ let output = ContextWithParquet::new(case_data_type, Page)
+ .await
+ .query(sql)
+ .await;
+
+ println!("{}", output.description());
+ assert_eq!(output.predicate_evaluation_errors(), expected_errors);
+ assert_eq!(output.row_pages_pruned(), expected_row_pages_pruned);
+ assert_eq!(
+ output.result_rows,
+ expected_results,
+ "{}",
+ output.description()
+ );
+}
+
+#[tokio::test]
+// null count min max
+// page-0 1 2020-01-01T01:01:01.000000000 2020-01-02T01:01:01.000000000
+// page-1 1 2020-01-01T01:01:11.000000000 2020-01-02T01:01:11.000000000
+// page-2 1 2020-01-01T01:11:01.000000000 2020-01-02T01:11:01.000000000
+// page-3 1 2020-01-11T01:01:01.000000000 2020-01-12T01:01:01.000000000
+async fn prune_timestamps_nanos() {
+ test_prune(
+ Scenario::Timestamps,
+ "SELECT * FROM t where nanos < to_timestamp('2020-01-02 01:01:11Z')",
+ Some(0),
+ Some(5),
+ 10,
+ )
+ .await;
+}
+
+#[tokio::test]
+// null count min max
+// page-0 1 2020-01-01T01:01:01.000000 2020-01-02T01:01:01.000000
+// page-1 1 2020-01-01T01:01:11.000000 2020-01-02T01:01:11.000000
+// page-2 1 2020-01-01T01:11:01.000000 2020-01-02T01:11:01.000000
+// page-3 1 2020-01-11T01:01:01.000000 2020-01-12T01:01:01.000000
+async fn prune_timestamps_micros() {
+ test_prune(
+ Scenario::Timestamps,
+ "SELECT * FROM t where micros < to_timestamp_micros('2020-01-02 01:01:11Z')",
+ Some(0),
+ Some(5),
+ 10,
+ )
+ .await;
+}
+
+#[tokio::test]
+// null count min max
+// page-0 1 2020-01-01T01:01:01.000 2020-01-02T01:01:01.000
+// page-1 1 2020-01-01T01:01:11.000 2020-01-02T01:01:11.000
+// page-2 1 2020-01-01T01:11:01.000 2020-01-02T01:11:01.000
+// page-3 1 2020-01-11T01:01:01.000 2020-01-12T01:01:01.000
+async fn prune_timestamps_millis() {
+ test_prune(
+ Scenario::Timestamps,
+ "SELECT * FROM t where millis < to_timestamp_millis('2020-01-02 01:01:11Z')",
+ Some(0),
+ Some(5),
+ 10,
+ )
+ .await;
+}
+
+#[tokio::test]
+// null count min max
+// page-0 1 1577840461 1577926861
+// page-1 1 1577840471 1577926871
+// page-2 1 1577841061 1577927461
+// page-3 1 1578704461 1578790861
+
+async fn prune_timestamps_seconds() {
+ test_prune(
+ Scenario::Timestamps,
+ "SELECT * FROM t where seconds < to_timestamp_seconds('2020-01-02 01:01:11Z')",
+ Some(0),
+ Some(5),
+ 10,
+ )
+ .await;
+}
+
+#[tokio::test]
+// null count min max
+// page-0 1 2020-01-01 2020-01-04
+// page-1 1 2020-01-11 2020-01-14
+// page-2 1 2020-10-27 2020-10-30
+// page-3 1 2029-11-09 2029-11-12
+async fn prune_date32() {
+ test_prune(
+ Scenario::Dates,
+ "SELECT * FROM t where date32 < cast('2020-01-02' as date)",
+ Some(0),
+ Some(15),
+ 1,
+ )
+ .await;
+}
+
+#[tokio::test]
+// null count min max
+// page-0 1 2020-01-01 2020-01-04
+// page-1 1 2020-01-11 2020-01-14
+// page-2 1 2020-10-27 2020-10-30
+// page-3 1 2029-11-09 2029-11-12
+async fn prune_date64() {
+ // work around for not being able to cast Date32 to Date64 automatically
+ let date = "2020-01-02"
+ .parse::<chrono::NaiveDate>()
+ .unwrap()
+ .and_time(chrono::NaiveTime::from_hms_opt(0, 0, 0).unwrap());
+ let date = ScalarValue::Date64(Some(date.timestamp_millis()));
+
+ let output = ContextWithParquet::new(Scenario::Dates, Page)
+ .await
+ .query_with_expr(col("date64").lt(lit(date)))
+ .await;
+
+ println!("{}", output.description());
+ // This should prune out groups without error
+ assert_eq!(output.predicate_evaluation_errors(), Some(0));
+ assert_eq!(output.row_pages_pruned(), Some(15));
+ assert_eq!(output.result_rows, 1, "{}", output.description());
+}
+
+#[tokio::test]
+// null count min max
+// page-0 0 -5 -1
+// page-1 0 -4 0
+// page-2 0 0 4
+// page-3 0 5 9
+async fn prune_int32_lt() {
+ test_prune(
+ Scenario::Int32,
+ "SELECT * FROM t where i < 1",
+ Some(0),
+ Some(5),
+ 11,
+ )
+ .await;
+ // result of sql "SELECT * FROM t where i < 1" is same as
+ // "SELECT * FROM t where -i > -1"
+ test_prune(
+ Scenario::Int32,
+ "SELECT * FROM t where -i > -1",
+ Some(0),
+ Some(5),
+ 11,
+ )
+ .await;
+}
+
+#[tokio::test]
+async fn prune_int32_gt() {
+ test_prune(
+ Scenario::Int32,
+ "SELECT * FROM t where i > 8",
+ Some(0),
+ Some(15),
+ 1,
+ )
+ .await;
+
+ test_prune(
+ Scenario::Int32,
+ "SELECT * FROM t where -i < -8",
+ Some(0),
+ Some(15),
+ 1,
+ )
+ .await;
+}
+
+#[tokio::test]
+async fn prune_int32_eq() {
+ test_prune(
+ Scenario::Int32,
+ "SELECT * FROM t where i = 1",
+ Some(0),
+ Some(15),
+ 1,
+ )
+ .await;
+}
+#[tokio::test]
+#[ignore]
+async fn prune_int32_scalar_fun_and_eq() {
+ test_prune(
+ Scenario::Int32,
+ "SELECT * FROM t where abs(i) = 1 and i = 1",
+ Some(0),
+ Some(15),
+ 1,
+ )
+ .await;
+}
+
+#[tokio::test]
+#[ignore]
+async fn prune_int32_scalar_fun() {
+ test_prune(
+ Scenario::Int32,
+ "SELECT * FROM t where abs(i) = 1",
+ Some(0),
+ Some(0),
+ 3,
+ )
+ .await;
+}
+
+#[tokio::test]
+#[ignore]
+async fn prune_int32_complex_expr() {
+ test_prune(
+ Scenario::Int32,
+ "SELECT * FROM t where i+1 = 1",
+ Some(0),
+ Some(0),
+ 2,
+ )
+ .await;
+}
+
+#[tokio::test]
+#[ignore]
+async fn prune_int32_complex_expr_subtract() {
+ test_prune(
+ Scenario::Int32,
+ "SELECT * FROM t where 1-i > 1",
+ Some(0),
+ Some(0),
+ 9,
+ )
+ .await;
+}
+
+#[tokio::test]
+// null count min max
+// page-0 0 -5.0 -1.0
+// page-1 0 -4.0 0.0
+// page-2 0 0.0 4.0
+// page-3 0 5.0 9.0
+async fn prune_f64_lt() {
+ test_prune(
+ Scenario::Float64,
+ "SELECT * FROM t where f < 1",
+ Some(0),
+ Some(5),
+ 11,
+ )
+ .await;
+ test_prune(
+ Scenario::Float64,
+ "SELECT * FROM t where -f > -1",
+ Some(0),
+ Some(5),
+ 11,
+ )
+ .await;
+}
+
+#[tokio::test]
+#[ignore]
+async fn prune_f64_scalar_fun_and_gt() {
+ // result of sql "SELECT * FROM t where abs(f - 1) <= 0.000001 and f >= 0.1"
+ // only use "f >= 0" to prune
+ test_prune(
+ Scenario::Float64,
+ "SELECT * FROM t where abs(f - 1) <= 0.000001 and f >= 0.1",
+ Some(0),
+ Some(2),
+ 1,
+ )
+ .await;
+}
+
+#[tokio::test]
+#[ignore]
+async fn prune_f64_scalar_fun() {
+ // result of sql "SELECT * FROM t where abs(f-1) <= 0.000001" is not supported
+ test_prune(
+ Scenario::Float64,
+ "SELECT * FROM t where abs(f-1) <= 0.000001",
+ Some(0),
+ Some(0),
+ 1,
+ )
+ .await;
+}
+
+#[tokio::test]
+#[ignore]
+async fn prune_f64_complex_expr() {
+ // result of sql "SELECT * FROM t where f+1 > 1.1"" is not supported
+ test_prune(
+ Scenario::Float64,
+ "SELECT * FROM t where f+1 > 1.1",
+ Some(0),
+ Some(0),
+ 9,
+ )
+ .await;
+}
+
+#[tokio::test]
+#[ignore]
+async fn prune_f64_complex_expr_subtract() {
+ // result of sql "SELECT * FROM t where 1-f > 1" is not supported
+ test_prune(
+ Scenario::Float64,
+ "SELECT * FROM t where 1-f > 1",
+ Some(0),
+ Some(0),
+ 9,
+ )
+ .await;
+}
+
+#[tokio::test]
+// null count min max
+// page-0 0 -5 -1
+// page-1 0 -4 0
+// page-2 0 0 4
+// page-3 0 5 9
+async fn prune_int32_eq_in_list() {
+ // result of sql "SELECT * FROM t where in (1)"
+ test_prune(
+ Scenario::Int32,
+ "SELECT * FROM t where i in (1)",
+ Some(0),
+ Some(15),
+ 1,
+ )
+ .await;
+}
+
+#[tokio::test]
+async fn prune_int32_eq_in_list_negated() {
+ // result of sql "SELECT * FROM t where not in (1)" prune nothing
+ test_prune(
+ Scenario::Int32,
+ "SELECT * FROM t where i not in (1)",
+ Some(0),
+ Some(0),
+ 19,
+ )
+ .await;
+}
+
+#[tokio::test]
+async fn prune_decimal_lt() {
+ // The data type of decimal_col is decimal(9,2)
+ // There are three pages each 5 rows:
+ // [1.00, 6.00], [-5.00,6.00], [20.00,60.00]
+ test_prune(
+ Scenario::Decimal,
+ "SELECT * FROM t where decimal_col < 4",
+ Some(0),
+ Some(5),
+ 6,
+ )
+ .await;
+ // compare with the casted decimal value
+ test_prune(
+ Scenario::Decimal,
+ "SELECT * FROM t where decimal_col < cast(4.55 as decimal(20,2))",
+ Some(0),
+ Some(5),
+ 8,
+ )
+ .await;
+
+ // The data type of decimal_col is decimal(38,2)
+ test_prune(
+ Scenario::DecimalLargePrecision,
+ "SELECT * FROM t where decimal_col < 4",
+ Some(0),
+ Some(5),
+ 6,
+ )
+ .await;
+ // compare with the casted decimal value
+ test_prune(
+ Scenario::DecimalLargePrecision,
+ "SELECT * FROM t where decimal_col < cast(4.55 as decimal(20,2))",
+ Some(0),
+ Some(5),
+ 8,
+ )
+ .await;
+}
+
+#[tokio::test]
+async fn prune_decimal_eq() {
+ // The data type of decimal_col is decimal(9,2)
+ // There are three pages:
+ // [1.00, 6.00], [-5.00,6.00], [20.00,60.00]
+ test_prune(
+ Scenario::Decimal,
+ "SELECT * FROM t where decimal_col = 4",
+ Some(0),
+ Some(5),
+ 2,
+ )
+ .await;
+ test_prune(
+ Scenario::Decimal,
+ "SELECT * FROM t where decimal_col = 4.00",
+ Some(0),
+ Some(5),
+ 2,
+ )
+ .await;
+
+ // The data type of decimal_col is decimal(38,2)
+ test_prune(
+ Scenario::DecimalLargePrecision,
+ "SELECT * FROM t where decimal_col = 4",
+ Some(0),
+ Some(5),
+ 2,
+ )
+ .await;
+ test_prune(
+ Scenario::DecimalLargePrecision,
+ "SELECT * FROM t where decimal_col = 4.00",
+ Some(0),
+ Some(5),
+ 2,
+ )
+ .await;
+ test_prune(
+ Scenario::DecimalLargePrecision,
+ "SELECT * FROM t where decimal_col = 30.00",
+ Some(0),
+ Some(10),
+ 2,
+ )
+ .await;
+}
+
+#[tokio::test]
+async fn prune_decimal_in_list() {
+ // The data type of decimal_col is decimal(9,2)
+ // There are three pages:
+ // [1.00, 6.00], [-5.00,6.00], [20.00,60.00]
+ test_prune(
+ Scenario::Decimal,
+ "SELECT * FROM t where decimal_col in (4,3,2,123456789123)",
+ Some(0),
+ Some(5),
+ 5,
+ )
+ .await;
+ test_prune(
+ Scenario::Decimal,
+ "SELECT * FROM t where decimal_col in (4.00,3.00,11.2345,1)",
+ Some(0),
+ Some(5),
+ 6,
+ )
+ .await;
+
+ // The data type of decimal_col is decimal(38,2)
+ test_prune(
+ Scenario::DecimalLargePrecision,
+ "SELECT * FROM t where decimal_col in (4,3,2,123456789123)",
+ Some(0),
+ Some(5),
+ 5,
+ )
+ .await;
+ test_prune(
+ Scenario::DecimalLargePrecision,
+ "SELECT * FROM t where decimal_col in (4.00,3.00,11.2345,1)",
+ Some(0),
+ Some(5),
+ 6,
+ )
+ .await;
+}
diff --git a/datafusion/core/tests/parquet/row_group_pruning.rs b/datafusion/core/tests/parquet/row_group_pruning.rs
index c3de01b38..c7e3c533f 100644
--- a/datafusion/core/tests/parquet/row_group_pruning.rs
+++ b/datafusion/core/tests/parquet/row_group_pruning.rs
@@ -18,32 +18,12 @@
//! This file contains an end to end test of parquet pruning. It writes
//! data into a parquet file and then verifies row groups are pruned as
//! expected.
-use std::sync::Arc;
-
-use arrow::array::Decimal128Array;
-use arrow::{
- array::{
- Array, ArrayRef, Date32Array, Date64Array, Float64Array, Int32Array, StringArray,
- TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
- TimestampSecondArray,
- },
- datatypes::{DataType, Field, Schema},
- record_batch::RecordBatch,
- util::pretty::pretty_format_batches,
-};
-use chrono::{Datelike, Duration};
-use datafusion::{
- datasource::{provider_as_source, TableProvider},
- physical_plan::{
- accept, file_format::ParquetExec, metrics::MetricsSet, ExecutionPlan,
- ExecutionPlanVisitor,
- },
- prelude::{ParquetReadOptions, SessionConfig, SessionContext},
- scalar::ScalarValue,
-};
-use datafusion_expr::{col, lit, Expr, LogicalPlan, LogicalPlanBuilder};
-use parquet::{arrow::ArrowWriter, file::properties::WriterProperties};
-use tempfile::NamedTempFile;
+use datafusion::prelude::SessionConfig;
+use datafusion_common::ScalarValue;
+
+use crate::parquet::Unit::RowGroup;
+use crate::parquet::{ContextWithParquet, Scenario};
+use datafusion_expr::{col, lit};
async fn test_prune(
case_data_type: Scenario,
@@ -52,7 +32,7 @@ async fn test_prune(
expected_row_group_pruned: Option<usize>,
expected_results: usize,
) {
- let output = ContextWithParquet::new(case_data_type)
+ let output = ContextWithParquet::new(case_data_type, RowGroup)
.await
.query(sql)
.await;
@@ -137,7 +117,7 @@ async fn prune_date64() {
.and_time(chrono::NaiveTime::from_hms_opt(0, 0, 0).unwrap());
let date = ScalarValue::Date64(Some(date.timestamp_millis()));
- let output = ContextWithParquet::new(Scenario::Dates)
+ let output = ContextWithParquet::new(Scenario::Dates, RowGroup)
.await
.query_with_expr(col("date64").lt(lit(date)))
// .query(
@@ -169,7 +149,7 @@ async fn prune_disabled() {
let expected_rows = 10;
let config = SessionConfig::new().with_parquet_pruning(false);
- let output = ContextWithParquet::with_config(Scenario::Timestamps, config)
+ let output = ContextWithParquet::with_config(Scenario::Timestamps, RowGroup, config)
.await
.query(query)
.await;
@@ -503,465 +483,3 @@ async fn prune_decimal_in_list() {
)
.await;
}
-
-// ----------------------
-// Begin test fixture
-// ----------------------
-
-/// What data to use
-enum Scenario {
- Timestamps,
- Dates,
- Int32,
- Float64,
- Decimal,
- DecimalLargePrecision,
-}
-
-/// Test fixture that has an execution context that has an external
-/// table "t" registered, pointing at a parquet file made with
-/// `make_test_file`
-struct ContextWithParquet {
- #[allow(dead_code)]
- /// temp file parquet data is written to. The file is cleaned up
- /// when dropped
- file: NamedTempFile,
- provider: Arc<dyn TableProvider>,
- ctx: SessionContext,
-}
-
-/// The output of running one of the test cases
-struct TestOutput {
- /// The input string
- sql: String,
- /// Execution metrics for the Parquet Scan
- parquet_metrics: MetricsSet,
- /// number of rows in results
- result_rows: usize,
- /// the contents of the input, as a string
- pretty_input: String,
- /// the raw results, as a string
- pretty_results: String,
-}
-
-impl TestOutput {
- /// retrieve the value of the named metric, if any
- fn metric_value(&self, metric_name: &str) -> Option<usize> {
- self.parquet_metrics
- .sum(|metric| metric.value().name() == metric_name)
- .map(|v| v.as_usize())
- }
-
- /// The number of times the pruning predicate evaluation errors
- fn predicate_evaluation_errors(&self) -> Option<usize> {
- self.metric_value("predicate_evaluation_errors")
- }
-
- /// The number of times the pruning predicate evaluation errors
- fn row_groups_pruned(&self) -> Option<usize> {
- self.metric_value("row_groups_pruned")
- }
-
- fn description(&self) -> String {
- format!(
- "Input:\n{}\nQuery:\n{}\nOutput:\n{}\nMetrics:\n{}",
- self.pretty_input, self.sql, self.pretty_results, self.parquet_metrics,
- )
- }
-}
-
-/// Creates an execution context that has an external table "t"
-/// registered pointing at a parquet file made with `make_test_file`
-/// and the appropriate scenario
-impl ContextWithParquet {
- async fn new(scenario: Scenario) -> Self {
- Self::with_config(scenario, SessionConfig::new()).await
- }
-
- async fn with_config(scenario: Scenario, config: SessionConfig) -> Self {
- let file = make_test_file(scenario).await;
- let parquet_path = file.path().to_string_lossy();
-
- // now, setup a the file as a data source and run a query against it
- let ctx = SessionContext::with_config(config);
-
- ctx.register_parquet("t", &parquet_path, ParquetReadOptions::default())
- .await
- .unwrap();
- let provider = ctx.deregister_table("t").unwrap().unwrap();
- ctx.register_table("t", provider.clone()).unwrap();
-
- Self {
- file,
- provider,
- ctx,
- }
- }
-
- /// runs a query like "SELECT * from t WHERE <expr> and returns
- /// the number of output rows and normalized execution metrics
- async fn query_with_expr(&mut self, expr: Expr) -> TestOutput {
- let sql = format!("EXPR only: {:?}", expr);
- let logical_plan = LogicalPlanBuilder::scan(
- "t",
- provider_as_source(self.provider.clone()),
- None,
- )
- .unwrap()
- .filter(expr)
- .unwrap()
- .build()
- .unwrap();
- self.run_test(logical_plan, sql).await
- }
-
- /// Runs the specified SQL query and returns the number of output
- /// rows and normalized execution metrics
- async fn query(&mut self, sql: &str) -> TestOutput {
- println!("Planning sql {}", sql);
- let logical_plan = self
- .ctx
- .sql(sql)
- .await
- .expect("planning")
- .to_unoptimized_plan();
- self.run_test(logical_plan, sql).await
- }
-
- /// runs the logical plan
- async fn run_test(
- &mut self,
- logical_plan: LogicalPlan,
- sql: impl Into<String>,
- ) -> TestOutput {
- let input = self
- .ctx
- .sql("SELECT * from t")
- .await
- .expect("planning")
- .collect()
- .await
- .expect("getting input");
- let pretty_input = pretty_format_batches(&input).unwrap().to_string();
-
- let logical_plan = self.ctx.optimize(&logical_plan).expect("optimizing plan");
-
- let physical_plan = self
- .ctx
- .create_physical_plan(&logical_plan)
- .await
- .expect("creating physical plan");
-
- let task_ctx = self.ctx.task_ctx();
- let results = datafusion::physical_plan::collect(physical_plan.clone(), task_ctx)
- .await
- .expect("Running");
-
- // find the parquet metrics
- struct MetricsFinder {
- metrics: Option<MetricsSet>,
- }
- impl ExecutionPlanVisitor for MetricsFinder {
- type Error = std::convert::Infallible;
- fn pre_visit(
- &mut self,
- plan: &dyn ExecutionPlan,
- ) -> Result<bool, Self::Error> {
- if plan.as_any().downcast_ref::<ParquetExec>().is_some() {
- self.metrics = plan.metrics();
- }
- // stop searching once we have found the metrics
- Ok(self.metrics.is_none())
- }
- }
- let mut finder = MetricsFinder { metrics: None };
- accept(physical_plan.as_ref(), &mut finder).unwrap();
- let parquet_metrics = finder.metrics.unwrap();
-
- let result_rows = results.iter().map(|b| b.num_rows()).sum();
-
- let pretty_results = pretty_format_batches(&results).unwrap().to_string();
-
- let sql = sql.into();
- TestOutput {
- sql,
- parquet_metrics,
- result_rows,
- pretty_input,
- pretty_results,
- }
- }
-}
-
-/// Create a test parquet file with varioud data types
-async fn make_test_file(scenario: Scenario) -> NamedTempFile {
- let mut output_file = tempfile::Builder::new()
- .prefix("parquet_pruning")
- .suffix(".parquet")
- .tempfile()
- .expect("tempfile creation");
-
- let props = WriterProperties::builder()
- .set_max_row_group_size(5)
- .build();
-
- let batches = match scenario {
- Scenario::Timestamps => {
- vec![
- make_timestamp_batch(Duration::seconds(0)),
- make_timestamp_batch(Duration::seconds(10)),
- make_timestamp_batch(Duration::minutes(10)),
- make_timestamp_batch(Duration::days(10)),
- ]
- }
- Scenario::Dates => {
- vec![
- make_date_batch(Duration::days(0)),
- make_date_batch(Duration::days(10)),
- make_date_batch(Duration::days(300)),
- make_date_batch(Duration::days(3600)),
- ]
- }
- Scenario::Int32 => {
- vec![
- make_int32_batch(-5, 0),
- make_int32_batch(-4, 1),
- make_int32_batch(0, 5),
- make_int32_batch(5, 10),
- ]
- }
- Scenario::Float64 => {
- vec![
- make_f64_batch(vec![-5.0, -4.0, -3.0, -2.0, -1.0]),
- make_f64_batch(vec![-4.0, -3.0, -2.0, -1.0, 0.0]),
- make_f64_batch(vec![0.0, 1.0, 2.0, 3.0, 4.0]),
- make_f64_batch(vec![5.0, 6.0, 7.0, 8.0, 9.0]),
- ]
- }
- Scenario::Decimal => {
- // decimal record batch
- vec![
- make_decimal_batch(vec![100, 200, 300, 400, 600], 9, 2),
- make_decimal_batch(vec![-500, 100, 300, 400, 600], 9, 2),
- make_decimal_batch(vec![2000, 3000, 3000, 4000, 6000], 9, 2),
- ]
- }
- Scenario::DecimalLargePrecision => {
- // decimal record batch with large precision,
- // and the data will stored as FIXED_LENGTH_BYTE_ARRAY
- vec![
- make_decimal_batch(vec![100, 200, 300, 400, 600], 38, 2),
- make_decimal_batch(vec![-500, 100, 300, 400, 600], 38, 2),
- make_decimal_batch(vec![2000, 3000, 3000, 4000, 6000], 38, 2),
- ]
- }
- };
-
- let schema = batches[0].schema();
-
- let mut writer = ArrowWriter::try_new(&mut output_file, schema, Some(props)).unwrap();
-
- for batch in batches {
- writer.write(&batch).expect("writing batch");
- }
- writer.close().unwrap();
-
- output_file
-}
-
-/// Return record batch with a few rows of data for all of the supported timestamp types
-/// values with the specified offset
-///
-/// Columns are named:
-/// "nanos" --> TimestampNanosecondArray
-/// "micros" --> TimestampMicrosecondArray
-/// "millis" --> TimestampMillisecondArray
-/// "seconds" --> TimestampSecondArray
-/// "names" --> StringArray
-fn make_timestamp_batch(offset: Duration) -> RecordBatch {
- let ts_strings = vec![
- Some("2020-01-01T01:01:01.0000000000001"),
- Some("2020-01-01T01:02:01.0000000000001"),
- Some("2020-01-01T02:01:01.0000000000001"),
- None,
- Some("2020-01-02T01:01:01.0000000000001"),
- ];
-
- let offset_nanos = offset.num_nanoseconds().expect("non overflow nanos");
-
- let ts_nanos = ts_strings
- .into_iter()
- .map(|t| {
- t.map(|t| {
- offset_nanos
- + t.parse::<chrono::NaiveDateTime>()
- .unwrap()
- .timestamp_nanos()
- })
- })
- .collect::<Vec<_>>();
-
- let ts_micros = ts_nanos
- .iter()
- .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000))
- .collect::<Vec<_>>();
-
- let ts_millis = ts_nanos
- .iter()
- .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000))
- .collect::<Vec<_>>();
-
- let ts_seconds = ts_nanos
- .iter()
- .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000000))
- .collect::<Vec<_>>();
-
- let names = ts_nanos
- .iter()
- .enumerate()
- .map(|(i, _)| format!("Row {} + {}", i, offset))
- .collect::<Vec<_>>();
-
- let arr_nanos = TimestampNanosecondArray::from(ts_nanos);
- let arr_micros = TimestampMicrosecondArray::from(ts_micros);
- let arr_millis = TimestampMillisecondArray::from(ts_millis);
- let arr_seconds = TimestampSecondArray::from(ts_seconds);
-
- let names = names.iter().map(|s| s.as_str()).collect::<Vec<_>>();
- let arr_names = StringArray::from(names);
-
- let schema = Schema::new(vec![
- Field::new("nanos", arr_nanos.data_type().clone(), true),
- Field::new("micros", arr_micros.data_type().clone(), true),
- Field::new("millis", arr_millis.data_type().clone(), true),
- Field::new("seconds", arr_seconds.data_type().clone(), true),
- Field::new("name", arr_names.data_type().clone(), true),
- ]);
- let schema = Arc::new(schema);
-
- RecordBatch::try_new(
- schema,
- vec![
- Arc::new(arr_nanos),
- Arc::new(arr_micros),
- Arc::new(arr_millis),
- Arc::new(arr_seconds),
- Arc::new(arr_names),
- ],
- )
- .unwrap()
-}
-
-/// Return record batch with i32 sequence
-///
-/// Columns are named
-/// "i" -> Int32Array
-fn make_int32_batch(start: i32, end: i32) -> RecordBatch {
- let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
- let v: Vec<i32> = (start..end).collect();
- let array = Arc::new(Int32Array::from(v)) as ArrayRef;
- RecordBatch::try_new(schema, vec![array.clone()]).unwrap()
-}
-
-/// Return record batch with f64 vector
-///
-/// Columns are named
-/// "f" -> Float64Array
-fn make_f64_batch(v: Vec<f64>) -> RecordBatch {
- let schema = Arc::new(Schema::new(vec![Field::new("f", DataType::Float64, true)]));
- let array = Arc::new(Float64Array::from(v)) as ArrayRef;
- RecordBatch::try_new(schema, vec![array.clone()]).unwrap()
-}
-
-/// Return record batch with decimal vector
-///
-/// Columns are named
-/// "decimal_col" -> DecimalArray
-fn make_decimal_batch(v: Vec<i128>, precision: u8, scale: u8) -> RecordBatch {
- let schema = Arc::new(Schema::new(vec![Field::new(
- "decimal_col",
- DataType::Decimal128(precision, scale),
- true,
- )]));
- let array = Arc::new(
- Decimal128Array::from_iter_values(v)
- .with_precision_and_scale(precision, scale)
- .unwrap(),
- ) as ArrayRef;
- RecordBatch::try_new(schema, vec![array.clone()]).unwrap()
-}
-
-/// Return record batch with a few rows of data for all of the supported date
-/// types with the specified offset (in days)
-///
-/// Columns are named:
-/// "date32" --> Date32Array
-/// "date64" --> Date64Array
-/// "names" --> StringArray
-fn make_date_batch(offset: Duration) -> RecordBatch {
- let date_strings = vec![
- Some("2020-01-01"),
- Some("2020-01-02"),
- Some("2020-01-03"),
- None,
- Some("2020-01-04"),
- ];
-
- let names = date_strings
- .iter()
- .enumerate()
- .map(|(i, val)| format!("Row {} + {}: {:?}", i, offset, val))
- .collect::<Vec<_>>();
-
- // Copied from `cast.rs` cast kernel due to lack of temporal kernels
- // https://github.com/apache/arrow-rs/issues/527
- const EPOCH_DAYS_FROM_CE: i32 = 719_163;
-
- let date_seconds = date_strings
- .iter()
- .map(|t| {
- t.map(|t| {
- let t = t.parse::<chrono::NaiveDate>().unwrap();
- let t = t + offset;
- t.num_days_from_ce() - EPOCH_DAYS_FROM_CE
- })
- })
- .collect::<Vec<_>>();
-
- let date_millis = date_strings
- .into_iter()
- .map(|t| {
- t.map(|t| {
- let t = t
- .parse::<chrono::NaiveDate>()
- .unwrap()
- .and_time(chrono::NaiveTime::from_hms_opt(0, 0, 0).unwrap());
- let t = t + offset;
- t.timestamp_millis()
- })
- })
- .collect::<Vec<_>>();
-
- let arr_date32 = Date32Array::from(date_seconds);
- let arr_date64 = Date64Array::from(date_millis);
-
- let names = names.iter().map(|s| s.as_str()).collect::<Vec<_>>();
- let arr_names = StringArray::from(names);
-
- let schema = Schema::new(vec![
- Field::new("date32", arr_date32.data_type().clone(), true),
- Field::new("date64", arr_date64.data_type().clone(), true),
- Field::new("name", arr_names.data_type().clone(), true),
- ]);
- let schema = Arc::new(schema);
-
- RecordBatch::try_new(
- schema,
- vec![
- Arc::new(arr_date32),
- Arc::new(arr_date64),
- Arc::new(arr_names),
- ],
- )
- .unwrap()
-}
diff --git a/test-utils/src/data_gen.rs b/test-utils/src/data_gen.rs
index adff4a514..c82d56ef2 100644
--- a/test-utils/src/data_gen.rs
+++ b/test-utils/src/data_gen.rs
@@ -19,8 +19,8 @@ use std::ops::Range;
use std::sync::Arc;
use arrow::array::{
- Int32Builder, StringBuilder, StringDictionaryBuilder, TimestampNanosecondBuilder,
- UInt16Builder,
+ Decimal128Builder, Int32Builder, StringBuilder, StringDictionaryBuilder,
+ TimestampNanosecondBuilder, UInt16Builder,
};
use arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef, TimeUnit};
use arrow::record_batch::RecordBatch;
@@ -43,6 +43,7 @@ struct BatchBuilder {
request_bytes: Int32Builder,
response_bytes: Int32Builder,
response_status: UInt16Builder,
+ prices_status: Decimal128Builder,
/// optional number of rows produced
row_limit: Option<usize>,
@@ -73,6 +74,7 @@ impl BatchBuilder {
Field::new("request_bytes", DataType::Int32, true),
Field::new("response_bytes", DataType::Int32, true),
Field::new("response_status", DataType::UInt16, false),
+ Field::new("decimal_price", DataType::Decimal128(38, 0), false),
]))
}
@@ -146,6 +148,7 @@ impl BatchBuilder {
.append_option(rng.gen_bool(0.9).then(|| rng.gen()));
self.response_status
.append_value(status[rng.gen_range(0..status.len())]);
+ self.prices_status.append_value(self.row_count as i128);
}
fn finish(mut self, schema: SchemaRef) -> RecordBatch {
@@ -166,6 +169,12 @@ impl BatchBuilder {
Arc::new(self.request_bytes.finish()),
Arc::new(self.response_bytes.finish()),
Arc::new(self.response_status.finish()),
+ Arc::new(
+ self.prices_status
+ .finish()
+ .with_precision_and_scale(38, 0)
+ .unwrap(),
+ ),
],
)
.unwrap()