You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/07 14:29:27 UTC

[arrow-datafusion] branch master updated: Add metrics for parquet page level skipping (#4105)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d23cae48 Add metrics for parquet page level skipping (#4105)
4d23cae48 is described below

commit 4d23cae481c8e04fead1f1569dab0dc28b477008
Author: Yang Jiang <ya...@ebay.com>
AuthorDate: Mon Nov 7 22:29:21 2022 +0800

    Add metrics for parquet page level skipping (#4105)
    
    * Add statistics for parquet page level skipping
    
    Signed-off-by: yangjiang <ya...@ebay.com>
    
    * use page row limit fix ut
    
    Signed-off-by: yangjiang <ya...@ebay.com>
    
    * fix.
    
    Signed-off-by: yangjiang <ya...@ebay.com>
    
    * remove todo
    
    Signed-off-by: yangjiang <ya...@ebay.com>
    
    Signed-off-by: yangjiang <ya...@ebay.com>
---
 .../core/src/datasource/file_format/parquet.rs     | 58 +++++++++++++--------
 .../core/src/physical_plan/file_format/parquet.rs  | 59 ++++++++++++++++++----
 .../physical_plan/file_format/parquet/metrics.rs   | 13 +++++
 .../file_format/parquet/page_filter.rs             | 14 +++++
 4 files changed, 115 insertions(+), 29 deletions(-)

diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs
index 07819bdf5..3de58d456 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -532,25 +532,43 @@ pub(crate) mod test_util {
 
     pub async fn store_parquet(
         batches: Vec<RecordBatch>,
+        multi_page: bool,
     ) -> Result<(Vec<ObjectMeta>, Vec<NamedTempFile>)> {
-        let files: Vec<_> = batches
-            .into_iter()
-            .map(|batch| {
-                let mut output = NamedTempFile::new().expect("creating temp file");
-
-                let props = WriterProperties::builder().build();
-                let mut writer =
-                    ArrowWriter::try_new(&mut output, batch.schema(), Some(props))
-                        .expect("creating writer");
-
-                writer.write(&batch).expect("Writing batch");
-                writer.close().unwrap();
-                output
-            })
-            .collect();
-
-        let meta: Vec<_> = files.iter().map(local_unpartitioned_file).collect();
-        Ok((meta, files))
+        if multi_page {
+            // All batches write in to one file, each batch must have same schema.
+            let mut output = NamedTempFile::new().expect("creating temp file");
+            let mut builder = WriterProperties::builder();
+            builder = builder.set_data_page_row_count_limit(2);
+            let proper = builder.build();
+            let mut writer =
+                ArrowWriter::try_new(&mut output, batches[0].schema(), Some(proper))
+                    .expect("creating writer");
+            for b in batches {
+                writer.write(&b).expect("Writing batch");
+            }
+            writer.close().unwrap();
+            Ok((vec![local_unpartitioned_file(&output)], vec![output]))
+        } else {
+            // Each batch writes to their own file
+            let files: Vec<_> = batches
+                .into_iter()
+                .map(|batch| {
+                    let mut output = NamedTempFile::new().expect("creating temp file");
+
+                    let props = WriterProperties::builder().build();
+                    let mut writer =
+                        ArrowWriter::try_new(&mut output, batch.schema(), Some(props))
+                            .expect("creating writer");
+
+                    writer.write(&batch).expect("Writing batch");
+                    writer.close().unwrap();
+                    output
+                })
+                .collect();
+
+            let meta: Vec<_> = files.iter().map(local_unpartitioned_file).collect();
+            Ok((meta, files))
+        }
     }
 }
 
@@ -599,7 +617,7 @@ mod tests {
         let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)]).unwrap();
 
         let store = Arc::new(LocalFileSystem::new()) as _;
-        let (meta, _files) = store_parquet(vec![batch1, batch2]).await?;
+        let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?;
 
         let format = ParquetFormat::default();
         let schema = format.infer_schema(&store, &meta).await.unwrap();
@@ -738,7 +756,7 @@ mod tests {
         let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
             LocalFileSystem::new(),
         )));
-        let (meta, _files) = store_parquet(vec![batch1, batch2]).await?;
+        let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?;
 
         // Use a size hint larger than the parquet footer but smaller than the actual metadata, requiring a second fetch
         // for the remaining metadata
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 1f98dd88c..61d2e5bad 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -445,7 +445,7 @@ impl FileOpener for ParquetOpener {
             // page index pruning: if all data on individual pages can
             // be ruled using page metadata, rows from other columns
             // with that range can be skipped as well
-            if let Some(row_selection) = enable_page_index
+            if let Some(row_selection) = (enable_page_index && !row_groups.is_empty())
                 .then(|| {
                     page_filter::build_page_filter(
                         pruning_predicate.as_ref(),
@@ -914,7 +914,7 @@ mod tests {
         datasource::file_format::{parquet::ParquetFormat, FileFormat},
         physical_plan::collect,
     };
-    use arrow::array::Float32Array;
+    use arrow::array::{Float32Array, Int32Array};
     use arrow::datatypes::DataType::Decimal128;
     use arrow::record_batch::RecordBatch;
     use arrow::{
@@ -955,9 +955,16 @@ mod tests {
         predicate: Option<Expr>,
         pushdown_predicate: bool,
     ) -> Result<Vec<RecordBatch>> {
-        round_trip(batches, projection, schema, predicate, pushdown_predicate)
-            .await
-            .batches
+        round_trip(
+            batches,
+            projection,
+            schema,
+            predicate,
+            pushdown_predicate,
+            false,
+        )
+        .await
+        .batches
     }
 
     /// Writes each RecordBatch as an individual parquet file and then
@@ -969,6 +976,7 @@ mod tests {
         schema: Option<SchemaRef>,
         predicate: Option<Expr>,
         pushdown_predicate: bool,
+        page_index_predicate: bool,
     ) -> RoundTripResult {
         let file_schema = match schema {
             Some(schema) => schema,
@@ -978,7 +986,7 @@ mod tests {
             ),
         };
 
-        let (meta, _files) = store_parquet(batches).await.unwrap();
+        let (meta, _files) = store_parquet(batches, page_index_predicate).await.unwrap();
         let file_groups = meta.into_iter().map(Into::into).collect();
 
         // prepare the scan
@@ -1003,6 +1011,10 @@ mod tests {
                 .with_reorder_filters(true);
         }
 
+        if page_index_predicate {
+            parquet_exec = parquet_exec.with_enable_page_index(true);
+        }
+
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let parquet_exec = Arc::new(parquet_exec);
@@ -1220,7 +1232,8 @@ mod tests {
         let filter = col("c2").eq(lit(2_i64));
 
         // read/write them files:
-        let rt = round_trip(vec![batch1, batch2], None, None, Some(filter), true).await;
+        let rt =
+            round_trip(vec![batch1, batch2], None, None, Some(filter), true, false).await;
         let expected = vec![
             "+----+----+----+",
             "| c1 | c3 | c2 |",
@@ -1369,7 +1382,8 @@ mod tests {
         let filter = col("c2").eq(lit(1_i64));
 
         // read/write them files:
-        let rt = round_trip(vec![batch1, batch2], None, None, Some(filter), true).await;
+        let rt =
+            round_trip(vec![batch1, batch2], None, None, Some(filter), true, false).await;
 
         let expected = vec![
             "+----+----+",
@@ -1690,6 +1704,33 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn parquet_page_index_exec_metrics() {
+        let c1: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(2)]));
+        let c2: ArrayRef = Arc::new(Int32Array::from(vec![Some(3), Some(4), Some(5)]));
+        let batch1 = create_batch(vec![("int", c1.clone())]);
+        let batch2 = create_batch(vec![("int", c2.clone())]);
+
+        let filter = col("int").eq(lit(4_i32));
+
+        let rt =
+            round_trip(vec![batch1, batch2], None, None, Some(filter), false, true).await;
+
+        let metrics = rt.parquet_exec.metrics().unwrap();
+
+        // assert the batches and some metrics
+        let expected = vec![
+            "+-----+", "| int |", "+-----+", "| 3   |", "| 4   |", "| 5   |", "+-----+",
+        ];
+        assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
+        assert_eq!(get_value(&metrics, "page_index_rows_filtered"), 3);
+        assert!(
+            get_value(&metrics, "page_index_eval_time") > 0,
+            "no eval time in metrics: {:#?}",
+            metrics
+        );
+    }
+
     #[tokio::test]
     async fn parquet_exec_metrics() {
         let c1: ArrayRef = Arc::new(StringArray::from(vec![
@@ -1709,7 +1750,7 @@ mod tests {
         let filter = col("c1").not_eq(lit("bar"));
 
         // read/write them files:
-        let rt = round_trip(vec![batch1], None, None, Some(filter), true).await;
+        let rt = round_trip(vec![batch1], None, None, Some(filter), true, false).await;
 
         let metrics = rt.parquet_exec.metrics().unwrap();
 
diff --git a/datafusion/core/src/physical_plan/file_format/parquet/metrics.rs b/datafusion/core/src/physical_plan/file_format/parquet/metrics.rs
index 58e340e62..64e08753a 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet/metrics.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet/metrics.rs
@@ -35,6 +35,10 @@ pub struct ParquetFileMetrics {
     pub pushdown_rows_filtered: Count,
     /// Total time spent evaluating pushdown filters
     pub pushdown_eval_time: Time,
+    /// Total rows filtered out by parquet page index
+    pub page_index_rows_filtered: Count,
+    /// Total time spent evaluating parquet page index filters
+    pub page_index_eval_time: Time,
 }
 
 impl ParquetFileMetrics {
@@ -63,6 +67,13 @@ impl ParquetFileMetrics {
         let pushdown_eval_time = MetricBuilder::new(metrics)
             .with_new_label("filename", filename.to_string())
             .subset_time("pushdown_eval_time", partition);
+        let page_index_rows_filtered = MetricBuilder::new(metrics)
+            .with_new_label("filename", filename.to_string())
+            .counter("page_index_rows_filtered", partition);
+
+        let page_index_eval_time = MetricBuilder::new(metrics)
+            .with_new_label("filename", filename.to_string())
+            .subset_time("page_index_eval_time", partition);
 
         Self {
             predicate_evaluation_errors,
@@ -70,6 +81,8 @@ impl ParquetFileMetrics {
             bytes_scanned,
             pushdown_rows_filtered,
             pushdown_eval_time,
+            page_index_rows_filtered,
+            page_index_eval_time,
         }
     }
 }
diff --git a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
index 37002af87..828d21375 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
@@ -100,6 +100,8 @@ pub(crate) fn build_page_filter(
     file_metadata: &ParquetMetaData,
     file_metrics: &ParquetFileMetrics,
 ) -> Result<Option<RowSelection>> {
+    // scoped timer updates on drop
+    let _timer_guard = file_metrics.page_index_eval_time.timer();
     let page_index_predicates =
         extract_page_index_push_down_predicates(pruning_predicate, schema)?;
 
@@ -154,6 +156,18 @@ pub(crate) fn build_page_filter(
             row_selections.push_back(selectors.into_iter().flatten().collect::<Vec<_>>());
         }
         let final_selection = combine_multi_col_selection(row_selections);
+        let total_skip =
+            final_selection.iter().fold(
+                0,
+                |acc, x| {
+                    if x.skip {
+                        acc + x.row_count
+                    } else {
+                        acc
+                    }
+                },
+            );
+        file_metrics.page_index_rows_filtered.add(total_skip);
         Ok(Some(final_selection.into()))
     } else {
         Ok(None)