You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/07 14:29:27 UTC
[arrow-datafusion] branch master updated: Add metrics for parquet page level skipping (#4105)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 4d23cae48 Add metrics for parquet page level skipping (#4105)
4d23cae48 is described below
commit 4d23cae481c8e04fead1f1569dab0dc28b477008
Author: Yang Jiang <ya...@ebay.com>
AuthorDate: Mon Nov 7 22:29:21 2022 +0800
Add metrics for parquet page level skipping (#4105)
* Add statistics for parquet page level skipping
Signed-off-by: yangjiang <ya...@ebay.com>
* use page row limit fix ut
Signed-off-by: yangjiang <ya...@ebay.com>
* fix.
Signed-off-by: yangjiang <ya...@ebay.com>
* remove todo
Signed-off-by: yangjiang <ya...@ebay.com>
Signed-off-by: yangjiang <ya...@ebay.com>
---
.../core/src/datasource/file_format/parquet.rs | 58 +++++++++++++--------
.../core/src/physical_plan/file_format/parquet.rs | 59 ++++++++++++++++++----
.../physical_plan/file_format/parquet/metrics.rs | 13 +++++
.../file_format/parquet/page_filter.rs | 14 +++++
4 files changed, 115 insertions(+), 29 deletions(-)
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs
index 07819bdf5..3de58d456 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -532,25 +532,43 @@ pub(crate) mod test_util {
pub async fn store_parquet(
batches: Vec<RecordBatch>,
+ multi_page: bool,
) -> Result<(Vec<ObjectMeta>, Vec<NamedTempFile>)> {
- let files: Vec<_> = batches
- .into_iter()
- .map(|batch| {
- let mut output = NamedTempFile::new().expect("creating temp file");
-
- let props = WriterProperties::builder().build();
- let mut writer =
- ArrowWriter::try_new(&mut output, batch.schema(), Some(props))
- .expect("creating writer");
-
- writer.write(&batch).expect("Writing batch");
- writer.close().unwrap();
- output
- })
- .collect();
-
- let meta: Vec<_> = files.iter().map(local_unpartitioned_file).collect();
- Ok((meta, files))
+ if multi_page {
+ // All batches write in to one file, each batch must have same schema.
+ let mut output = NamedTempFile::new().expect("creating temp file");
+ let mut builder = WriterProperties::builder();
+ builder = builder.set_data_page_row_count_limit(2);
+ let proper = builder.build();
+ let mut writer =
+ ArrowWriter::try_new(&mut output, batches[0].schema(), Some(proper))
+ .expect("creating writer");
+ for b in batches {
+ writer.write(&b).expect("Writing batch");
+ }
+ writer.close().unwrap();
+ Ok((vec![local_unpartitioned_file(&output)], vec![output]))
+ } else {
+ // Each batch writes to their own file
+ let files: Vec<_> = batches
+ .into_iter()
+ .map(|batch| {
+ let mut output = NamedTempFile::new().expect("creating temp file");
+
+ let props = WriterProperties::builder().build();
+ let mut writer =
+ ArrowWriter::try_new(&mut output, batch.schema(), Some(props))
+ .expect("creating writer");
+
+ writer.write(&batch).expect("Writing batch");
+ writer.close().unwrap();
+ output
+ })
+ .collect();
+
+ let meta: Vec<_> = files.iter().map(local_unpartitioned_file).collect();
+ Ok((meta, files))
+ }
}
}
@@ -599,7 +617,7 @@ mod tests {
let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)]).unwrap();
let store = Arc::new(LocalFileSystem::new()) as _;
- let (meta, _files) = store_parquet(vec![batch1, batch2]).await?;
+ let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?;
let format = ParquetFormat::default();
let schema = format.infer_schema(&store, &meta).await.unwrap();
@@ -738,7 +756,7 @@ mod tests {
let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
LocalFileSystem::new(),
)));
- let (meta, _files) = store_parquet(vec![batch1, batch2]).await?;
+ let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?;
// Use a size hint larger than the parquet footer but smaller than the actual metadata, requiring a second fetch
// for the remaining metadata
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 1f98dd88c..61d2e5bad 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -445,7 +445,7 @@ impl FileOpener for ParquetOpener {
// page index pruning: if all data on individual pages can
// be ruled using page metadata, rows from other columns
// with that range can be skipped as well
- if let Some(row_selection) = enable_page_index
+ if let Some(row_selection) = (enable_page_index && !row_groups.is_empty())
.then(|| {
page_filter::build_page_filter(
pruning_predicate.as_ref(),
@@ -914,7 +914,7 @@ mod tests {
datasource::file_format::{parquet::ParquetFormat, FileFormat},
physical_plan::collect,
};
- use arrow::array::Float32Array;
+ use arrow::array::{Float32Array, Int32Array};
use arrow::datatypes::DataType::Decimal128;
use arrow::record_batch::RecordBatch;
use arrow::{
@@ -955,9 +955,16 @@ mod tests {
predicate: Option<Expr>,
pushdown_predicate: bool,
) -> Result<Vec<RecordBatch>> {
- round_trip(batches, projection, schema, predicate, pushdown_predicate)
- .await
- .batches
+ round_trip(
+ batches,
+ projection,
+ schema,
+ predicate,
+ pushdown_predicate,
+ false,
+ )
+ .await
+ .batches
}
/// Writes each RecordBatch as an individual parquet file and then
@@ -969,6 +976,7 @@ mod tests {
schema: Option<SchemaRef>,
predicate: Option<Expr>,
pushdown_predicate: bool,
+ page_index_predicate: bool,
) -> RoundTripResult {
let file_schema = match schema {
Some(schema) => schema,
@@ -978,7 +986,7 @@ mod tests {
),
};
- let (meta, _files) = store_parquet(batches).await.unwrap();
+ let (meta, _files) = store_parquet(batches, page_index_predicate).await.unwrap();
let file_groups = meta.into_iter().map(Into::into).collect();
// prepare the scan
@@ -1003,6 +1011,10 @@ mod tests {
.with_reorder_filters(true);
}
+ if page_index_predicate {
+ parquet_exec = parquet_exec.with_enable_page_index(true);
+ }
+
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let parquet_exec = Arc::new(parquet_exec);
@@ -1220,7 +1232,8 @@ mod tests {
let filter = col("c2").eq(lit(2_i64));
// read/write them files:
- let rt = round_trip(vec![batch1, batch2], None, None, Some(filter), true).await;
+ let rt =
+ round_trip(vec![batch1, batch2], None, None, Some(filter), true, false).await;
let expected = vec![
"+----+----+----+",
"| c1 | c3 | c2 |",
@@ -1369,7 +1382,8 @@ mod tests {
let filter = col("c2").eq(lit(1_i64));
// read/write them files:
- let rt = round_trip(vec![batch1, batch2], None, None, Some(filter), true).await;
+ let rt =
+ round_trip(vec![batch1, batch2], None, None, Some(filter), true, false).await;
let expected = vec![
"+----+----+",
@@ -1690,6 +1704,33 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn parquet_page_index_exec_metrics() {
+ let c1: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(2)]));
+ let c2: ArrayRef = Arc::new(Int32Array::from(vec![Some(3), Some(4), Some(5)]));
+ let batch1 = create_batch(vec![("int", c1.clone())]);
+ let batch2 = create_batch(vec![("int", c2.clone())]);
+
+ let filter = col("int").eq(lit(4_i32));
+
+ let rt =
+ round_trip(vec![batch1, batch2], None, None, Some(filter), false, true).await;
+
+ let metrics = rt.parquet_exec.metrics().unwrap();
+
+ // assert the batches and some metrics
+ let expected = vec![
+ "+-----+", "| int |", "+-----+", "| 3 |", "| 4 |", "| 5 |", "+-----+",
+ ];
+ assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
+ assert_eq!(get_value(&metrics, "page_index_rows_filtered"), 3);
+ assert!(
+ get_value(&metrics, "page_index_eval_time") > 0,
+ "no eval time in metrics: {:#?}",
+ metrics
+ );
+ }
+
#[tokio::test]
async fn parquet_exec_metrics() {
let c1: ArrayRef = Arc::new(StringArray::from(vec![
@@ -1709,7 +1750,7 @@ mod tests {
let filter = col("c1").not_eq(lit("bar"));
// read/write them files:
- let rt = round_trip(vec![batch1], None, None, Some(filter), true).await;
+ let rt = round_trip(vec![batch1], None, None, Some(filter), true, false).await;
let metrics = rt.parquet_exec.metrics().unwrap();
diff --git a/datafusion/core/src/physical_plan/file_format/parquet/metrics.rs b/datafusion/core/src/physical_plan/file_format/parquet/metrics.rs
index 58e340e62..64e08753a 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet/metrics.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet/metrics.rs
@@ -35,6 +35,10 @@ pub struct ParquetFileMetrics {
pub pushdown_rows_filtered: Count,
/// Total time spent evaluating pushdown filters
pub pushdown_eval_time: Time,
+ /// Total rows filtered out by parquet page index
+ pub page_index_rows_filtered: Count,
+ /// Total time spent evaluating parquet page index filters
+ pub page_index_eval_time: Time,
}
impl ParquetFileMetrics {
@@ -63,6 +67,13 @@ impl ParquetFileMetrics {
let pushdown_eval_time = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.subset_time("pushdown_eval_time", partition);
+ let page_index_rows_filtered = MetricBuilder::new(metrics)
+ .with_new_label("filename", filename.to_string())
+ .counter("page_index_rows_filtered", partition);
+
+ let page_index_eval_time = MetricBuilder::new(metrics)
+ .with_new_label("filename", filename.to_string())
+ .subset_time("page_index_eval_time", partition);
Self {
predicate_evaluation_errors,
@@ -70,6 +81,8 @@ impl ParquetFileMetrics {
bytes_scanned,
pushdown_rows_filtered,
pushdown_eval_time,
+ page_index_rows_filtered,
+ page_index_eval_time,
}
}
}
diff --git a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
index 37002af87..828d21375 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
@@ -100,6 +100,8 @@ pub(crate) fn build_page_filter(
file_metadata: &ParquetMetaData,
file_metrics: &ParquetFileMetrics,
) -> Result<Option<RowSelection>> {
+ // scoped timer updates on drop
+ let _timer_guard = file_metrics.page_index_eval_time.timer();
let page_index_predicates =
extract_page_index_push_down_predicates(pruning_predicate, schema)?;
@@ -154,6 +156,18 @@ pub(crate) fn build_page_filter(
row_selections.push_back(selectors.into_iter().flatten().collect::<Vec<_>>());
}
let final_selection = combine_multi_col_selection(row_selections);
+ let total_skip =
+ final_selection.iter().fold(
+ 0,
+ |acc, x| {
+ if x.skip {
+ acc + x.row_count
+ } else {
+ acc
+ }
+ },
+ );
+ file_metrics.page_index_rows_filtered.add(total_skip);
Ok(Some(final_selection.into()))
} else {
Ok(None)