You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/29 17:18:14 UTC

[arrow-datafusion] branch master updated: improve hashjoin execution metrics (#4394)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 66c95e70a improve hashjoin execution metrics (#4394)
66c95e70a is described below

commit 66c95e70ae2ff9f3f89b91898ede875d316e731f
Author: AssHero <hu...@gmail.com>
AuthorDate: Wed Nov 30 01:18:09 2022 +0800

    improve hashjoin execution metrics (#4394)
    
    * improve hashjoin execution metrics
    
    * improve hashjoin execution metrics
---
 .../core/src/physical_plan/joins/hash_join.rs      | 22 +++++++++++++++-------
 1 file changed, 15 insertions(+), 7 deletions(-)

diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs
index 3017df623..82f4ec7c2 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -149,7 +149,9 @@ pub struct HashJoinExec {
 #[derive(Debug)]
 struct HashJoinMetrics {
     /// Total time for joining probe-side batches to the build-side batches
-    join_time: metrics::Time,
+    probe_time: metrics::Time,
+    /// Total time for building hashmap
+    build_time: metrics::Time,
     /// Number of batches consumed by this operator
     input_batches: metrics::Count,
     /// Number of rows consumed by this operator
@@ -162,7 +164,9 @@ struct HashJoinMetrics {
 
 impl HashJoinMetrics {
     pub fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
-        let join_time = MetricBuilder::new(metrics).subset_time("join_time", partition);
+        let probe_time = MetricBuilder::new(metrics).subset_time("probe_time", partition);
+
+        let build_time = MetricBuilder::new(metrics).subset_time("build_time", partition);
 
         let input_batches =
             MetricBuilder::new(metrics).counter("input_batches", partition);
@@ -175,7 +179,8 @@ impl HashJoinMetrics {
         let output_rows = MetricBuilder::new(metrics).output_rows(partition);
 
         Self {
-            join_time,
+            probe_time,
+            build_time,
             input_batches,
             input_rows,
             output_batches,
@@ -1487,10 +1492,12 @@ impl HashJoinStream {
         &mut self,
         cx: &mut std::task::Context<'_>,
     ) -> Poll<Option<ArrowResult<RecordBatch>>> {
+        let build_timer = self.join_metrics.build_time.timer();
         let left_data = match ready!(self.left_fut.get(cx)) {
             Ok(left_data) => left_data,
             Err(e) => return Poll::Ready(Some(Err(e))),
         };
+        build_timer.done();
 
         let visited_left_side = self.visited_left_side.get_or_insert_with(|| {
             let num_rows = left_data.1.num_rows();
@@ -1516,7 +1523,7 @@ impl HashJoinStream {
             .poll_next_unpin(cx)
             .map(|maybe_batch| match maybe_batch {
                 Some(Ok(batch)) => {
-                    let timer = self.join_metrics.join_time.timer();
+                    let timer = self.join_metrics.probe_time.timer();
                     let result = build_batch(
                         &batch,
                         left_data,
@@ -1532,7 +1539,6 @@ impl HashJoinStream {
                     self.join_metrics.input_batches.add(1);
                     self.join_metrics.input_rows.add(batch.num_rows());
                     if let Ok((ref batch, ref left_side)) = result {
-                        timer.done();
                         self.join_metrics.output_batches.add(1);
                         self.join_metrics.output_rows.add(batch.num_rows());
 
@@ -1551,11 +1557,13 @@ impl HashJoinStream {
                             | JoinType::RightAnti => {}
                         }
                     }
-                    Some(result.map(|x| x.0))
+                    let final_result = Some(result.map(|x| x.0));
+                    timer.done();
+                    final_result
                 }
                 Some(err) => Some(err),
                 None => {
-                    let timer = self.join_metrics.join_time.timer();
+                    let timer = self.join_metrics.probe_time.timer();
                     // For the left join, produce rows for unmatched rows
                     match self.join_type {
                         JoinType::Left