You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/29 17:18:14 UTC
[arrow-datafusion] branch master updated: improve hashjoin execution metrics (#4394)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 66c95e70a improve hashjoin execution metrics (#4394)
66c95e70a is described below
commit 66c95e70ae2ff9f3f89b91898ede875d316e731f
Author: AssHero <hu...@gmail.com>
AuthorDate: Wed Nov 30 01:18:09 2022 +0800
improve hashjoin execution metrics (#4394)
* improve hashjoin execution metrics
* improve hashjoin execution metrics
---
.../core/src/physical_plan/joins/hash_join.rs | 22 +++++++++++++++-------
1 file changed, 15 insertions(+), 7 deletions(-)
diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs
index 3017df623..82f4ec7c2 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -149,7 +149,9 @@ pub struct HashJoinExec {
#[derive(Debug)]
struct HashJoinMetrics {
/// Total time for joining probe-side batches to the build-side batches
- join_time: metrics::Time,
+ probe_time: metrics::Time,
+ /// Total time for building hashmap
+ build_time: metrics::Time,
/// Number of batches consumed by this operator
input_batches: metrics::Count,
/// Number of rows consumed by this operator
@@ -162,7 +164,9 @@ struct HashJoinMetrics {
impl HashJoinMetrics {
pub fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
- let join_time = MetricBuilder::new(metrics).subset_time("join_time", partition);
+ let probe_time = MetricBuilder::new(metrics).subset_time("probe_time", partition);
+
+ let build_time = MetricBuilder::new(metrics).subset_time("build_time", partition);
let input_batches =
MetricBuilder::new(metrics).counter("input_batches", partition);
@@ -175,7 +179,8 @@ impl HashJoinMetrics {
let output_rows = MetricBuilder::new(metrics).output_rows(partition);
Self {
- join_time,
+ probe_time,
+ build_time,
input_batches,
input_rows,
output_batches,
@@ -1487,10 +1492,12 @@ impl HashJoinStream {
&mut self,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<ArrowResult<RecordBatch>>> {
+ let build_timer = self.join_metrics.build_time.timer();
let left_data = match ready!(self.left_fut.get(cx)) {
Ok(left_data) => left_data,
Err(e) => return Poll::Ready(Some(Err(e))),
};
+ build_timer.done();
let visited_left_side = self.visited_left_side.get_or_insert_with(|| {
let num_rows = left_data.1.num_rows();
@@ -1516,7 +1523,7 @@ impl HashJoinStream {
.poll_next_unpin(cx)
.map(|maybe_batch| match maybe_batch {
Some(Ok(batch)) => {
- let timer = self.join_metrics.join_time.timer();
+ let timer = self.join_metrics.probe_time.timer();
let result = build_batch(
&batch,
left_data,
@@ -1532,7 +1539,6 @@ impl HashJoinStream {
self.join_metrics.input_batches.add(1);
self.join_metrics.input_rows.add(batch.num_rows());
if let Ok((ref batch, ref left_side)) = result {
- timer.done();
self.join_metrics.output_batches.add(1);
self.join_metrics.output_rows.add(batch.num_rows());
@@ -1551,11 +1557,13 @@ impl HashJoinStream {
| JoinType::RightAnti => {}
}
}
- Some(result.map(|x| x.0))
+ let final_result = Some(result.map(|x| x.0));
+ timer.done();
+ final_result
}
Some(err) => Some(err),
None => {
- let timer = self.join_metrics.join_time.timer();
+ let timer = self.join_metrics.probe_time.timer();
// For the left join, produce rows for unmatched rows
match self.join_type {
JoinType::Left