You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2021/07/04 08:54:15 UTC
[arrow-datafusion] branch master updated: Implement metrics for
HashJoinExec (#664)
This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new e036a62 Implement metrics for HashJoinExec (#664)
e036a62 is described below
commit e036a626d3ac41fb3c414371ceb1e250af14f11a
Author: Andy Grove <an...@gmail.com>
AuthorDate: Sun Jul 4 02:53:56 2021 -0600
Implement metrics for HashJoinExec (#664)
---
datafusion/src/physical_plan/hash_join.rs | 103 +++++++++++++++++++-----------
1 file changed, 66 insertions(+), 37 deletions(-)
diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs
index 195a19c..f426bc9 100644
--- a/datafusion/src/physical_plan/hash_join.rs
+++ b/datafusion/src/physical_plan/hash_join.rs
@@ -64,7 +64,7 @@ use super::{
SendableRecordBatchStream,
};
use crate::physical_plan::coalesce_batches::concat_batches;
-use crate::physical_plan::PhysicalExpr;
+use crate::physical_plan::{PhysicalExpr, SQLMetric};
use log::debug;
// Maps a `u64` hash value based on the left ["on" values] to a list of indices with this key's value.
@@ -102,6 +102,35 @@ pub struct HashJoinExec {
random_state: RandomState,
/// Partitioning mode to use
mode: PartitionMode,
+ /// Metrics
+ metrics: Arc<HashJoinMetrics>,
+}
+
+/// Metrics for HashJoinExec
+#[derive(Debug)]
+struct HashJoinMetrics {
+ /// Total time for joining probe-side batches to the build-side batches
+ join_time: Arc<SQLMetric>,
+ /// Number of batches consumed by this operator
+ input_batches: Arc<SQLMetric>,
+ /// Number of rows consumed by this operator
+ input_rows: Arc<SQLMetric>,
+ /// Number of batches produced by this operator
+ output_batches: Arc<SQLMetric>,
+ /// Number of rows produced by this operator
+ output_rows: Arc<SQLMetric>,
+}
+
+impl HashJoinMetrics {
+ fn new() -> Self {
+ Self {
+ join_time: SQLMetric::time_nanos(),
+ input_batches: SQLMetric::counter(),
+ input_rows: SQLMetric::counter(),
+ output_batches: SQLMetric::counter(),
+ output_rows: SQLMetric::counter(),
+ }
+ }
}
#[derive(Clone, Copy, Debug, PartialEq)]
@@ -154,6 +183,7 @@ impl HashJoinExec {
build_side: Arc::new(Mutex::new(None)),
random_state,
mode: partition_mode,
+ metrics: Arc::new(HashJoinMetrics::new()),
})
}
@@ -394,6 +424,7 @@ impl ExecutionPlan for HashJoinExec {
column_indices,
self.random_state.clone(),
visited_left_side,
+ self.metrics.clone(),
)))
}
@@ -412,6 +443,22 @@ impl ExecutionPlan for HashJoinExec {
}
}
}
+
+ fn metrics(&self) -> HashMap<String, SQLMetric> {
+ let mut metrics = HashMap::new();
+ metrics.insert("joinTime".to_owned(), (*self.metrics.join_time).clone());
+ metrics.insert(
+ "inputBatches".to_owned(),
+ (*self.metrics.input_batches).clone(),
+ );
+ metrics.insert("inputRows".to_owned(), (*self.metrics.input_rows).clone());
+ metrics.insert(
+ "outputBatches".to_owned(),
+ (*self.metrics.output_batches).clone(),
+ );
+ metrics.insert("outputRows".to_owned(), (*self.metrics.output_rows).clone());
+ metrics
+ }
}
/// Updates `hash` with new entries from [RecordBatch] evaluated against the expressions `on`,
@@ -467,22 +514,14 @@ struct HashJoinStream {
right: SendableRecordBatchStream,
/// Information of index and left / right placement of columns
column_indices: Vec<ColumnIndex>,
- /// number of input batches
- num_input_batches: usize,
- /// number of input rows
- num_input_rows: usize,
- /// number of batches produced
- num_output_batches: usize,
- /// number of rows produced
- num_output_rows: usize,
- /// total time for joining probe-side batches to the build-side batches
- join_time: usize,
/// Random state used for hashing initialization
random_state: RandomState,
/// Keeps track of the left side rows whether they are visited
visited_left_side: Vec<bool>, // TODO: use a more memory efficient data structure, https://github.com/apache/arrow-datafusion/issues/240
/// There is nothing to process anymore and left side is processed in case of left join
is_exhausted: bool,
+ /// Metrics
+ metrics: Arc<HashJoinMetrics>,
}
#[allow(clippy::too_many_arguments)]
@@ -497,6 +536,7 @@ impl HashJoinStream {
column_indices: Vec<ColumnIndex>,
random_state: RandomState,
visited_left_side: Vec<bool>,
+ metrics: Arc<HashJoinMetrics>,
) -> Self {
HashJoinStream {
schema,
@@ -506,14 +546,10 @@ impl HashJoinStream {
left_data,
right,
column_indices,
- num_input_batches: 0,
- num_input_rows: 0,
- num_output_batches: 0,
- num_output_rows: 0,
- join_time: 0,
random_state,
visited_left_side,
is_exhausted: false,
+ metrics,
}
}
}
@@ -1215,12 +1251,14 @@ impl Stream for HashJoinStream {
&self.column_indices,
&self.random_state,
);
- self.num_input_batches += 1;
- self.num_input_rows += batch.num_rows();
+ self.metrics.input_batches.add(1);
+ self.metrics.input_rows.add(batch.num_rows());
if let Ok((ref batch, ref left_side)) = result {
- self.join_time += start.elapsed().as_millis() as usize;
- self.num_output_batches += 1;
- self.num_output_rows += batch.num_rows();
+ self.metrics
+ .join_time
+ .add(start.elapsed().as_millis() as usize);
+ self.metrics.output_batches.add(1);
+ self.metrics.output_rows.add(batch.num_rows());
match self.join_type {
JoinType::Left
@@ -1254,13 +1292,14 @@ impl Stream for HashJoinStream {
self.join_type != JoinType::Semi,
);
if let Ok(ref batch) = result {
- self.num_input_batches += 1;
- self.num_input_rows += batch.num_rows();
+ self.metrics.input_batches.add(1);
+ self.metrics.input_rows.add(batch.num_rows());
if let Ok(ref batch) = result {
- self.join_time +=
- start.elapsed().as_millis() as usize;
- self.num_output_batches += 1;
- self.num_output_rows += batch.num_rows();
+ self.metrics
+ .join_time
+ .add(start.elapsed().as_millis() as usize);
+ self.metrics.output_batches.add(1);
+ self.metrics.output_rows.add(batch.num_rows());
}
}
self.is_exhausted = true;
@@ -1274,16 +1313,6 @@ impl Stream for HashJoinStream {
| JoinType::Right => {}
}
- // End of right batch, print stats in debug mode
- debug!(
- "Processed {} probe-side input batches containing {} rows and \
- produced {} output batches containing {} rows in {} ms",
- self.num_input_batches,
- self.num_input_rows,
- self.num_output_batches,
- self.num_output_rows,
- self.join_time
- );
other
}
})