You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/04/21 16:12:15 UTC
[arrow-datafusion] branch master updated: Add bytes scanned metric to ParquetExec (#2273)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 4b8f1d59c Add bytes scanned metric to ParquetExec (#2273)
4b8f1d59c is described below
commit 4b8f1d59ccbe5f00471eddd7f8ef23a52d9bbc91
Author: Dan Harris <13...@users.noreply.github.com>
AuthorDate: Thu Apr 21 12:12:09 2022 -0400
Add bytes scanned metric to ParquetExec (#2273)
---
.../core/src/datasource/file_format/parquet.rs | 62 +++++++++++++++++++---
.../core/src/physical_plan/file_format/parquet.rs | 13 ++++-
2 files changed, 68 insertions(+), 7 deletions(-)
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs
index 18168d95f..daadff97d 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -48,7 +48,7 @@ use crate::logical_plan::combine_filters;
use crate::logical_plan::Expr;
use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
use crate::physical_plan::file_format::{ParquetExec, SchemaAdapter};
-use crate::physical_plan::ExecutionPlan;
+use crate::physical_plan::{metrics, ExecutionPlan};
use crate::physical_plan::{Accumulator, Statistics};
use datafusion_data_access::object_store::{ObjectReader, ObjectReaderStream};
@@ -275,7 +275,10 @@ fn summarize_min_max(
/// Read and parse the schema of the Parquet file at location `path`
fn fetch_schema(object_reader: Arc<dyn ObjectReader>) -> Result<Schema> {
- let obj_reader = ChunkObjectReader(object_reader);
+ let obj_reader = ChunkObjectReader {
+ object_reader,
+ bytes_scanned: None,
+ };
let file_reader = Arc::new(SerializedFileReader::new(obj_reader)?);
let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
let schema = arrow_reader.get_schema()?;
@@ -288,7 +291,10 @@ fn fetch_statistics(
object_reader: Arc<dyn ObjectReader>,
table_schema: SchemaRef,
) -> Result<Statistics> {
- let obj_reader = ChunkObjectReader(object_reader);
+ let obj_reader = ChunkObjectReader {
+ object_reader,
+ bytes_scanned: None,
+ };
let file_reader = Arc::new(SerializedFileReader::new(obj_reader)?);
let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
let file_schema = arrow_reader.get_schema()?;
@@ -362,11 +368,16 @@ fn fetch_statistics(
}
/// A wrapper around the object reader to make it implement `ChunkReader`
-pub struct ChunkObjectReader(pub Arc<dyn ObjectReader>);
+pub struct ChunkObjectReader {
+ /// The underlying object reader
+ pub object_reader: Arc<dyn ObjectReader>,
+ /// Optional counter which will track total number of bytes scanned
+ pub bytes_scanned: Option<metrics::Count>,
+}
impl Length for ChunkObjectReader {
fn len(&self) -> u64 {
- self.0.length()
+ self.object_reader.length()
}
}
@@ -374,7 +385,10 @@ impl ChunkReader for ChunkObjectReader {
type T = Box<dyn Read + Send + Sync>;
fn get_read(&self, start: u64, length: usize) -> ParquetResult<Self::T> {
- self.0
+ if let Some(m) = self.bytes_scanned.as_ref() {
+ m.add(length)
+ }
+ self.object_reader
.sync_chunk_reader(start, length)
.map_err(DataFusionError::IoError)
.map_err(|e| ParquetError::ArrowError(e.to_string()))
@@ -391,6 +405,7 @@ mod tests {
use super::*;
+ use crate::physical_plan::metrics::MetricValue;
use crate::prelude::{SessionConfig, SessionContext};
use arrow::array::{
ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
@@ -517,6 +532,30 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn capture_bytes_scanned_metric() -> Result<()> {
+ let config = SessionConfig::new().with_batch_size(2);
+ let ctx = SessionContext::with_config(config);
+ let projection = None;
+
+ // Read the full file
+ let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
+
+ // Read only one column. This should scan less data.
+ let exec_projected =
+ get_exec("alltypes_plain.parquet", &Some(vec![0]), None).await?;
+
+ let task_ctx = ctx.task_ctx();
+
+ let _ = collect(exec.clone(), task_ctx.clone()).await?;
+ let _ = collect(exec_projected.clone(), task_ctx).await?;
+
+ assert_bytes_scanned(exec, 2522);
+ assert_bytes_scanned(exec_projected, 1924);
+
+ Ok(())
+ }
+
#[tokio::test]
async fn read_limit() -> Result<()> {
let session_ctx = SessionContext::new();
@@ -748,6 +787,17 @@ mod tests {
Ok(())
}
+ fn assert_bytes_scanned(exec: Arc<dyn ExecutionPlan>, expected: usize) {
+ let actual = exec
+ .metrics()
+ .expect("Metrics not recorded")
+ .sum(|metric| matches!(metric.value(), MetricValue::Count { name, .. } if name == "bytes_scanned"))
+ .map(|t| t.as_usize())
+ .expect("bytes_scanned metric not recorded");
+
+ assert_eq!(actual, expected);
+ }
+
async fn get_exec(
file_name: &str,
projection: &Option<Vec<usize>>,
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index dd9817f95..df7327aa0 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -87,6 +87,8 @@ struct ParquetFileMetrics {
pub predicate_evaluation_errors: metrics::Count,
/// Number of row groups pruned using
pub row_groups_pruned: metrics::Count,
+ /// Total number of bytes scanned
+ pub bytes_scanned: metrics::Count,
}
impl ParquetExec {
@@ -151,9 +153,14 @@ impl ParquetFileMetrics {
.with_new_label("filename", filename.to_string())
.counter("row_groups_pruned", partition);
+ let bytes_scanned = MetricBuilder::new(metrics)
+ .with_new_label("filename", filename.to_string())
+ .counter("bytes_scanned", partition);
+
Self {
predicate_evaluation_errors,
row_groups_pruned,
+ bytes_scanned,
}
}
}
@@ -315,6 +322,7 @@ impl ParquetExecStream {
file.file_meta.path(),
&self.metrics,
);
+ let bytes_scanned = file_metrics.bytes_scanned.clone();
let object_reader = self
.object_store
.file_reader(file.file_meta.sized_file.clone())?;
@@ -336,7 +344,10 @@ impl ParquetExecStream {
}
let file_reader = SerializedFileReader::new_with_options(
- ChunkObjectReader(object_reader),
+ ChunkObjectReader {
+ object_reader,
+ bytes_scanned: Some(bytes_scanned),
+ },
opt.build(),
)?;