You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/04/21 16:12:15 UTC

[arrow-datafusion] branch master updated: Add bytes scanned metric to ParquetExec (#2273)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b8f1d59c Add bytes scanned metric to ParquetExec (#2273)
4b8f1d59c is described below

commit 4b8f1d59ccbe5f00471eddd7f8ef23a52d9bbc91
Author: Dan Harris <13...@users.noreply.github.com>
AuthorDate: Thu Apr 21 12:12:09 2022 -0400

    Add bytes scanned metric to ParquetExec (#2273)
---
 .../core/src/datasource/file_format/parquet.rs     | 62 +++++++++++++++++++---
 .../core/src/physical_plan/file_format/parquet.rs  | 13 ++++-
 2 files changed, 68 insertions(+), 7 deletions(-)

diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs
index 18168d95f..daadff97d 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -48,7 +48,7 @@ use crate::logical_plan::combine_filters;
 use crate::logical_plan::Expr;
 use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
 use crate::physical_plan::file_format::{ParquetExec, SchemaAdapter};
-use crate::physical_plan::ExecutionPlan;
+use crate::physical_plan::{metrics, ExecutionPlan};
 use crate::physical_plan::{Accumulator, Statistics};
 use datafusion_data_access::object_store::{ObjectReader, ObjectReaderStream};
 
@@ -275,7 +275,10 @@ fn summarize_min_max(
 
 /// Read and parse the schema of the Parquet file at location `path`
 fn fetch_schema(object_reader: Arc<dyn ObjectReader>) -> Result<Schema> {
-    let obj_reader = ChunkObjectReader(object_reader);
+    let obj_reader = ChunkObjectReader {
+        object_reader,
+        bytes_scanned: None,
+    };
     let file_reader = Arc::new(SerializedFileReader::new(obj_reader)?);
     let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
     let schema = arrow_reader.get_schema()?;
@@ -288,7 +291,10 @@ fn fetch_statistics(
     object_reader: Arc<dyn ObjectReader>,
     table_schema: SchemaRef,
 ) -> Result<Statistics> {
-    let obj_reader = ChunkObjectReader(object_reader);
+    let obj_reader = ChunkObjectReader {
+        object_reader,
+        bytes_scanned: None,
+    };
     let file_reader = Arc::new(SerializedFileReader::new(obj_reader)?);
     let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
     let file_schema = arrow_reader.get_schema()?;
@@ -362,11 +368,16 @@ fn fetch_statistics(
 }
 
 /// A wrapper around the object reader to make it implement `ChunkReader`
-pub struct ChunkObjectReader(pub Arc<dyn ObjectReader>);
+pub struct ChunkObjectReader {
+    /// The underlying object reader
+    pub object_reader: Arc<dyn ObjectReader>,
+    /// Optional counter which will track total number of bytes scanned
+    pub bytes_scanned: Option<metrics::Count>,
+}
 
 impl Length for ChunkObjectReader {
     fn len(&self) -> u64 {
-        self.0.length()
+        self.object_reader.length()
     }
 }
 
@@ -374,7 +385,10 @@ impl ChunkReader for ChunkObjectReader {
     type T = Box<dyn Read + Send + Sync>;
 
     fn get_read(&self, start: u64, length: usize) -> ParquetResult<Self::T> {
-        self.0
+        if let Some(m) = self.bytes_scanned.as_ref() {
+            m.add(length)
+        }
+        self.object_reader
             .sync_chunk_reader(start, length)
             .map_err(DataFusionError::IoError)
             .map_err(|e| ParquetError::ArrowError(e.to_string()))
@@ -391,6 +405,7 @@ mod tests {
 
     use super::*;
 
+    use crate::physical_plan::metrics::MetricValue;
     use crate::prelude::{SessionConfig, SessionContext};
     use arrow::array::{
         ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
@@ -517,6 +532,30 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn capture_bytes_scanned_metric() -> Result<()> {
+        let config = SessionConfig::new().with_batch_size(2);
+        let ctx = SessionContext::with_config(config);
+        let projection = None;
+
+        // Read the full file
+        let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
+
+        // Read only one column. This should scan less data.
+        let exec_projected =
+            get_exec("alltypes_plain.parquet", &Some(vec![0]), None).await?;
+
+        let task_ctx = ctx.task_ctx();
+
+        let _ = collect(exec.clone(), task_ctx.clone()).await?;
+        let _ = collect(exec_projected.clone(), task_ctx).await?;
+
+        assert_bytes_scanned(exec, 2522);
+        assert_bytes_scanned(exec_projected, 1924);
+
+        Ok(())
+    }
+
     #[tokio::test]
     async fn read_limit() -> Result<()> {
         let session_ctx = SessionContext::new();
@@ -748,6 +787,17 @@ mod tests {
         Ok(())
     }
 
+    fn assert_bytes_scanned(exec: Arc<dyn ExecutionPlan>, expected: usize) {
+        let actual = exec
+            .metrics()
+            .expect("Metrics not recorded")
+            .sum(|metric| matches!(metric.value(), MetricValue::Count { name, .. } if name == "bytes_scanned"))
+            .map(|t| t.as_usize())
+            .expect("bytes_scanned metric not recorded");
+
+        assert_eq!(actual, expected);
+    }
+
     async fn get_exec(
         file_name: &str,
         projection: &Option<Vec<usize>>,
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index dd9817f95..df7327aa0 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -87,6 +87,8 @@ struct ParquetFileMetrics {
     pub predicate_evaluation_errors: metrics::Count,
     /// Number of row groups pruned using
     pub row_groups_pruned: metrics::Count,
+    /// Total number of bytes scanned
+    pub bytes_scanned: metrics::Count,
 }
 
 impl ParquetExec {
@@ -151,9 +153,14 @@ impl ParquetFileMetrics {
             .with_new_label("filename", filename.to_string())
             .counter("row_groups_pruned", partition);
 
+        let bytes_scanned = MetricBuilder::new(metrics)
+            .with_new_label("filename", filename.to_string())
+            .counter("bytes_scanned", partition);
+
         Self {
             predicate_evaluation_errors,
             row_groups_pruned,
+            bytes_scanned,
         }
     }
 }
@@ -315,6 +322,7 @@ impl ParquetExecStream {
             file.file_meta.path(),
             &self.metrics,
         );
+        let bytes_scanned = file_metrics.bytes_scanned.clone();
         let object_reader = self
             .object_store
             .file_reader(file.file_meta.sized_file.clone())?;
@@ -336,7 +344,10 @@ impl ParquetExecStream {
         }
 
         let file_reader = SerializedFileReader::new_with_options(
-            ChunkObjectReader(object_reader),
+            ChunkObjectReader {
+                object_reader,
+                bytes_scanned: Some(bytes_scanned),
+            },
             opt.build(),
         )?;