You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/08 11:12:14 UTC
[arrow-datafusion] branch master updated: Add parquet predicate pushdown tests with smaller pages (#4131)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 175adbda6 Add parquet predicate pushdown tests with smaller pages (#4131)
175adbda6 is described below
commit 175adbda616ca2f339cfba375c1f86535f6e2ad1
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Tue Nov 8 06:12:09 2022 -0500
Add parquet predicate pushdown tests with smaller pages (#4131)
---
benchmarks/src/bin/parquet_filter_pushdown.rs | 36 ++---
datafusion/core/src/physical_optimizer/pruning.rs | 7 +
.../file_format/parquet/page_filter.rs | 13 +-
datafusion/core/tests/parquet_filter_pushdown.rs | 158 ++++++++++++++++++---
parquet-test-utils/src/lib.rs | 26 +---
5 files changed, 182 insertions(+), 58 deletions(-)
diff --git a/benchmarks/src/bin/parquet_filter_pushdown.rs b/benchmarks/src/bin/parquet_filter_pushdown.rs
index 4ec2dc90c..a3ff7bee5 100644
--- a/benchmarks/src/bin/parquet_filter_pushdown.rs
+++ b/benchmarks/src/bin/parquet_filter_pushdown.rs
@@ -21,6 +21,7 @@ use datafusion::logical_expr::{lit, or, Expr};
use datafusion::optimizer::utils::disjunction;
use datafusion::physical_plan::collect;
use datafusion::prelude::{col, SessionConfig, SessionContext};
+use parquet::file::properties::WriterProperties;
use parquet_test_utils::{ParquetScanOptions, TestParquetFile};
use std::path::PathBuf;
use std::time::Instant;
@@ -73,7 +74,19 @@ async fn main() -> Result<()> {
let path = opt.path.join("logs.parquet");
- let test_file = gen_data(path, opt.scale_factor, opt.page_size, opt.row_group_size)?;
+ let mut props_builder = WriterProperties::builder();
+
+ if let Some(s) = opt.page_size {
+ props_builder = props_builder
+ .set_data_pagesize_limit(s)
+ .set_write_batch_size(s);
+ }
+
+ if let Some(s) = opt.row_group_size {
+ props_builder = props_builder.set_max_row_group_size(s);
+ }
+
+ let test_file = gen_data(path, opt.scale_factor, props_builder.build())?;
run_benchmarks(&mut ctx, &test_file, opt.iterations, opt.debug).await?;
@@ -137,14 +150,9 @@ async fn run_benchmarks(
println!("Using scan options {:?}", scan_options);
for i in 0..iterations {
let start = Instant::now();
- let rows = exec_scan(
- ctx,
- test_file,
- filter_expr.clone(),
- scan_options.clone(),
- debug,
- )
- .await?;
+ let rows =
+ exec_scan(ctx, test_file, filter_expr.clone(), *scan_options, debug)
+ .await?;
println!(
"Iteration {} returned {} rows in {} ms",
i,
@@ -179,17 +187,11 @@ async fn exec_scan(
fn gen_data(
path: PathBuf,
scale_factor: f32,
- page_size: Option<usize>,
- row_group_size: Option<usize>,
+ props: WriterProperties,
) -> Result<TestParquetFile> {
let generator = AccessLogGenerator::new();
let num_batches = 100_f32 * scale_factor;
- TestParquetFile::try_new(
- path,
- generator.take(num_batches as usize),
- page_size,
- row_group_size,
- )
+ TestParquetFile::try_new(path, props, generator.take(num_batches as usize))
}
diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs
index 3adf0ad7b..a47815377 100644
--- a/datafusion/core/src/physical_optimizer/pruning.rs
+++ b/datafusion/core/src/physical_optimizer/pruning.rs
@@ -51,6 +51,7 @@ use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter};
use datafusion_expr::utils::expr_to_columns;
use datafusion_expr::{binary_expr, cast, try_cast, ExprSchemable};
use datafusion_physical_expr::create_physical_expr;
+use log::trace;
/// Interface to pass statistics information to [`PruningPredicate`]
///
@@ -415,6 +416,12 @@ fn build_statistics_record_batch<S: PruningStatistics>(
let mut options = RecordBatchOptions::default();
options.row_count = Some(statistics.num_containers());
+ trace!(
+ "Creating statistics batch for {:#?} with {:#?}",
+ required_columns,
+ arrays
+ );
+
RecordBatch::try_new_with_options(schema, arrays, &options).map_err(|err| {
DataFusionError::Plan(format!("Can not create statistics record batch: {}", err))
})
diff --git a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
index 828d21375..95c93151a 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
@@ -22,7 +22,7 @@ use arrow::{array::ArrayRef, datatypes::SchemaRef, error::ArrowError};
use datafusion_common::{Column, DataFusionError, Result};
use datafusion_expr::utils::expr_to_columns;
use datafusion_optimizer::utils::split_conjunction;
-use log::{debug, error};
+use log::{debug, error, trace};
use parquet::{
arrow::arrow_reader::{RowSelection, RowSelector},
errors::ParquetError,
@@ -143,6 +143,9 @@ pub(crate) fn build_page_filter(
}),
);
} else {
+ trace!(
+ "Did not have enough metadata to prune with page indexes, falling back, falling back to all rows",
+ );
// fallback select all rows
let all_selected =
vec![RowSelector::select(groups[*r].num_rows() as usize)];
@@ -150,8 +153,9 @@ pub(crate) fn build_page_filter(
}
}
debug!(
- "Use filter and page index create RowSelection {:?} from predicate:{:?}",
- &selectors, predicate
+ "Use filter and page index create RowSelection {:?} from predicate: {:?}",
+ &selectors,
+ predicate.predicate_expr(),
);
row_selections.push_back(selectors.into_iter().flatten().collect::<Vec<_>>());
}
@@ -321,7 +325,7 @@ fn prune_pages_in_one_row_group(
assert_eq!(row_vec.len(), values.len());
let mut sum_row = *row_vec.first().unwrap();
let mut selected = *values.first().unwrap();
-
+ trace!("Pruned to to {:?} using {:?}", values, pruning_stats);
for (i, &f) in values.iter().skip(1).enumerate() {
if f == selected {
sum_row += *row_vec.get(i).unwrap();
@@ -376,6 +380,7 @@ fn create_row_count_in_each_page(
/// Wraps one col page_index in one rowGroup statistics in a way
/// that implements [`PruningStatistics`]
+#[derive(Debug)]
struct PagesPruningStatistics<'a> {
col_page_indexes: &'a Index,
col_offset_indexes: &'a Vec<PageLocation>,
diff --git a/datafusion/core/tests/parquet_filter_pushdown.rs b/datafusion/core/tests/parquet_filter_pushdown.rs
index 2c0d75cc2..54b7d8d16 100644
--- a/datafusion/core/tests/parquet_filter_pushdown.rs
+++ b/datafusion/core/tests/parquet_filter_pushdown.rs
@@ -30,12 +30,12 @@ use std::time::Instant;
use arrow::compute::concat_batches;
use arrow::record_batch::RecordBatch;
-use datafusion::logical_expr::{lit, Expr};
use datafusion::physical_plan::collect;
use datafusion::physical_plan::metrics::MetricsSet;
-use datafusion::prelude::{col, SessionContext};
+use datafusion::prelude::{col, lit, lit_timestamp_nano, Expr, SessionContext};
use datafusion_optimizer::utils::{conjunction, disjunction, split_conjunction};
use itertools::Itertools;
+use parquet::file::properties::WriterProperties;
use parquet_test_utils::{ParquetScanOptions, TestParquetFile};
use tempfile::TempDir;
use test_utils::AccessLogGenerator;
@@ -43,24 +43,29 @@ use test_utils::AccessLogGenerator;
/// how many rows of generated data to write to our parquet file (arbitrary)
const NUM_ROWS: usize = 53819;
+#[cfg(test)]
+#[ctor::ctor]
+fn init() {
+ // enable logging so RUST_LOG works
+ let _ = env_logger::try_init();
+}
+
#[cfg(not(target_family = "windows"))]
#[tokio::test]
async fn single_file() {
// Only create the parquet file once as it is fairly large
+
let tempdir = TempDir::new().unwrap();
let generator = AccessLogGenerator::new().with_row_limit(Some(NUM_ROWS));
- // TODO: set the max page rows with some various / arbitrary sizes 8311
- // (using https://github.com/apache/arrow-rs/issues/2941) to ensure we get multiple pages
- let page_size = None;
- let row_group_size = None;
+ // default properties
+ let props = WriterProperties::builder().build();
let file = tempdir.path().join("data.parquet");
let start = Instant::now();
println!("Writing test data to {:?}", file);
- let test_parquet_file =
- TestParquetFile::try_new(file, generator, page_size, row_group_size).unwrap();
+ let test_parquet_file = TestParquetFile::try_new(file, props, generator).unwrap();
println!(
"Completed generating test data in {:?}",
Instant::now() - start
@@ -225,6 +230,77 @@ async fn single_file() {
.run()
.await;
}
+
+#[cfg(not(target_family = "windows"))]
+#[tokio::test]
+async fn single_file_small_data_pages() {
+ let tempdir = TempDir::new().unwrap();
+
+ let generator = AccessLogGenerator::new().with_row_limit(Some(NUM_ROWS));
+
+ // set the max page rows with arbitrary sizes 8311 to increase
+ // effectiveness of page filtering
+ let props = WriterProperties::builder()
+ .set_data_page_row_count_limit(8311)
+ .build();
+ let file = tempdir.path().join("data_8311.parquet");
+
+ let start = Instant::now();
+ println!("Writing test data to {:?}", file);
+ let test_parquet_file = TestParquetFile::try_new(file, props, generator).unwrap();
+ println!(
+ "Completed generating test data in {:?}",
+ Instant::now() - start
+ );
+
+ // The statistics on the 'pod' column are as follows:
+ //
+ // parquet-tools dump -d ~/Downloads/data_8311.parquet
+ //
+ // ...
+ // pod TV=53819 RL=0 DL=0 DS: 8 DE:PLAIN
+ // ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ // page 0: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: aqcathnxqsphdhgjtgvxsfyiwbmhlmg, max: bvjjmytpfzdfsvlzfhbunasihjgxpesbmxv, num_nulls not defined] CRC:[none] SZ:7 VC:9216
+ // page 1: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: bvjjmytpfzdfsvlzfhbunasihjgxpesbmxv, max: bxyubzxbbmhroqhrdzttngxcpwwgkpaoizvgzd, num_nulls not defined] CRC:[none] SZ:7 VC:9216
+ // page 2: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: bxyubzxbbmhroqhrdzttngxcpwwgkpaoizvgzd, max: djzdyiecnumrsrcbizwlqzdhnpoiqdh, num_nulls not defined] CRC:[none] SZ:10 VC:9216
+ // page 3: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: djzdyiecnumrsrcbizwlqzdhnpoiqdh, max: fktdcgtmzvoedpwhfevcvvrtaurzgex, num_nulls not defined] CRC:[none] SZ:7 VC:9216
+ // page 4: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: fktdcgtmzvoedpwhfevcvvrtaurzgex, max: fwtdpgtxwqkkgtgvthhwycrvjiizdifyp, num_nulls not defined] CRC:[none] SZ:7 VC:9216
+ // page 5: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: fwtdpgtxwqkkgtgvthhwycrvjiizdifyp, max: iadnalqpdzthpifrvewossmpqibgtsuin, num_nulls not defined] CRC:[none] SZ:7 VC:7739
+ //
+ // This test currently fails due to https://github.com/apache/arrow-datafusion/issues/3833
+ // (page index pruning not implemented for byte array)
+
+ // TestCase::new(&test_parquet_file)
+ // .with_name("selective")
+ // // predicagte is chosen carefully to prune pages 0, 1, 2, 3, 4
+ // // pod = 'iadnalqpdzthpifrvewossmpqibgtsuin'
+ // .with_filter(col("pod").eq(lit("iadnalqpdzthpifrvewossmpqibgtsuin")))
+ // .with_pushdown_expected(PushdownExpected::Some)
+ // .with_page_index_filtering_expected(PageIndexFilteringExpected::Some)
+ // .with_expected_rows(2574)
+ // .run()
+ // .await;
+
+ // time TV=53819 RL=0 DL=0 DS: 7092 DE:PLAIN
+ // --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ // page 0: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.004133888, num_nulls not defined] CRC:[none] SZ:13844 VC:9216
+ // page 1: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.006397952, num_nulls not defined] CRC:[none] SZ:14996 VC:9216
+ // page 2: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.005650432, num_nulls not defined] CRC:[none] SZ:14996 VC:9216
+ // page 3: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.004269056, num_nulls not defined] CRC:[none] SZ:14996 VC:9216
+ // page 4: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.007261184, num_nulls not defined] CRC:[none] SZ:14996 VC:9216
+ // page 5: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.005330944, num_nulls not defined] CRC:[none] SZ:12601 VC:7739
+ TestCase::new(&test_parquet_file)
+ .with_name("selective")
+ // predicagte is chosen carefully to prune pages
+ // time > 1970-01-01T00:00:00.004300000
+ .with_filter(col("time").gt(lit_timestamp_nano(4300000)))
+ .with_pushdown_expected(PushdownExpected::Some)
+ .with_page_index_filtering_expected(PageIndexFilteringExpected::Some)
+ .with_expected_rows(9745)
+ .run()
+ .await;
+}
+
/// Expected pushdown behavior
#[derive(Debug, Clone, Copy)]
enum PushdownExpected {
@@ -234,6 +310,15 @@ enum PushdownExpected {
Some,
}
+/// Expected pushdown behavior
+#[derive(Debug, Clone, Copy)]
+enum PageIndexFilteringExpected {
+ /// How many pages were expected to be pruned
+ None,
+ /// Expected that more than 0 were pruned
+ Some,
+}
+
/// parameters for running a test
struct TestCase<'a> {
test_parquet_file: &'a TestParquetFile,
@@ -243,6 +328,10 @@ struct TestCase<'a> {
filter: Expr,
/// Did we expect the pushdown filtering to have filtered any rows?
pushdown_expected: PushdownExpected,
+
+ /// Did we expect page filtering to filter out pages
+ page_index_filtering_expected: PageIndexFilteringExpected,
+
/// How many rows are expected to pass the predicate overall?
expected_rows: usize,
}
@@ -255,6 +344,7 @@ impl<'a> TestCase<'a> {
// default to a filter that passes everything
filter: lit(true),
pushdown_expected: PushdownExpected::None,
+ page_index_filtering_expected: PageIndexFilteringExpected::None,
expected_rows: 0,
}
}
@@ -271,8 +361,17 @@ impl<'a> TestCase<'a> {
}
/// Set the expected predicate pushdown
- fn with_pushdown_expected(mut self, pushdown_expected: PushdownExpected) -> Self {
- self.pushdown_expected = pushdown_expected;
+ fn with_pushdown_expected(mut self, v: PushdownExpected) -> Self {
+ self.pushdown_expected = v;
+ self
+ }
+
+ /// Set the expected page filtering
+ fn with_page_index_filtering_expected(
+ mut self,
+ v: PageIndexFilteringExpected,
+ ) -> Self {
+ self.page_index_filtering_expected = v;
self
}
@@ -307,8 +406,6 @@ impl<'a> TestCase<'a> {
reorder_filters: false,
enable_page_index: false,
},
- // always expect no pushdown
- PushdownExpected::None,
filter,
)
.await;
@@ -320,7 +417,6 @@ impl<'a> TestCase<'a> {
reorder_filters: false,
enable_page_index: false,
},
- self.pushdown_expected,
filter,
)
.await;
@@ -334,7 +430,6 @@ impl<'a> TestCase<'a> {
reorder_filters: true,
enable_page_index: false,
},
- self.pushdown_expected,
filter,
)
.await;
@@ -348,8 +443,6 @@ impl<'a> TestCase<'a> {
reorder_filters: false,
enable_page_index: true,
},
- // pushdown is off so no pushdown is expected
- PushdownExpected::None,
filter,
)
.await;
@@ -362,7 +455,6 @@ impl<'a> TestCase<'a> {
reorder_filters: true,
enable_page_index: true,
},
- self.pushdown_expected,
filter,
)
.await;
@@ -374,7 +466,6 @@ impl<'a> TestCase<'a> {
async fn read_with_options(
&self,
scan_options: ParquetScanOptions,
- pushdown_expected: PushdownExpected,
filter: &Expr,
) -> RecordBatch {
println!(" scan options: {scan_options:?}");
@@ -404,6 +495,14 @@ impl<'a> TestCase<'a> {
// verify expected pushdown
let metrics =
TestParquetFile::parquet_metrics(exec).expect("found parquet metrics");
+
+ let pushdown_expected = if scan_options.pushdown_filters {
+ self.pushdown_expected
+ } else {
+ // if filter pushdown is not enabled we don't expect it to filter rows
+ PushdownExpected::None
+ };
+
let pushdown_rows_filtered = get_value(&metrics, "pushdown_rows_filtered");
println!(" pushdown_rows_filtered: {}", pushdown_rows_filtered);
@@ -419,6 +518,29 @@ impl<'a> TestCase<'a> {
}
};
+ let page_index_rows_filtered = get_value(&metrics, "page_index_rows_filtered");
+ println!(" page_index_rows_filtered: {}", page_index_rows_filtered);
+
+ let page_index_filtering_expected = if scan_options.enable_page_index {
+ self.page_index_filtering_expected
+ } else {
+ // if page index filtering is not enabled, don't expect it
+ // to filter rows
+ PageIndexFilteringExpected::None
+ };
+
+ match page_index_filtering_expected {
+ PageIndexFilteringExpected::None => {
+ assert_eq!(page_index_rows_filtered, 0);
+ }
+ PageIndexFilteringExpected::Some => {
+ assert!(
+ page_index_rows_filtered > 0,
+ "Expected to filter rows via page index but none were",
+ );
+ }
+ };
+
batch
}
}
diff --git a/parquet-test-utils/src/lib.rs b/parquet-test-utils/src/lib.rs
index 6a3e6e0ec..41066584e 100644
--- a/parquet-test-utils/src/lib.rs
+++ b/parquet-test-utils/src/lib.rs
@@ -51,7 +51,7 @@ pub struct TestParquetFile {
object_meta: ObjectMeta,
}
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, Copy)]
pub struct ParquetScanOptions {
pub pushdown_filters: bool,
pub reorder_filters: bool,
@@ -59,34 +59,20 @@ pub struct ParquetScanOptions {
}
impl TestParquetFile {
- /// Creates a new parquet file at the specified location
+ /// Creates a new parquet file at the specified location with the
+ /// given properties
pub fn try_new(
path: PathBuf,
+ props: WriterProperties,
batches: impl IntoIterator<Item = RecordBatch>,
- page_size: Option<usize>,
- row_group_size: Option<usize>,
) -> Result<Self> {
let file = File::create(&path).unwrap();
- let mut props_builder = WriterProperties::builder();
-
- if let Some(s) = page_size {
- props_builder = props_builder
- .set_data_pagesize_limit(s)
- .set_write_batch_size(s);
- }
-
- if let Some(s) = row_group_size {
- props_builder = props_builder.set_max_row_group_size(s);
- }
-
let mut batches = batches.into_iter();
let first_batch = batches.next().expect("need at least one record batch");
let schema = first_batch.schema();
- let mut writer =
- ArrowWriter::try_new(file, schema.clone(), Some(props_builder.build()))
- .unwrap();
+ let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props)).unwrap();
writer.write(&first_batch).unwrap();
writer.flush()?;
@@ -122,7 +108,9 @@ impl TestParquetFile {
object_meta,
})
}
+}
+impl TestParquetFile {
/// return a `ParquetExec` and `FilterExec` with the specified options to scan this parquet file.
///
/// This returns the same plan that DataFusion will make with a pushed down predicate followed by a filter: