You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2021/07/04 08:54:15 UTC

[arrow-datafusion] branch master updated: Implement metrics for HashJoinExec (#664)

This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new e036a62  Implement metrics for HashJoinExec (#664)
e036a62 is described below

commit e036a626d3ac41fb3c414371ceb1e250af14f11a
Author: Andy Grove <an...@gmail.com>
AuthorDate: Sun Jul 4 02:53:56 2021 -0600

    Implement metrics for HashJoinExec (#664)
---
 datafusion/src/physical_plan/hash_join.rs | 103 +++++++++++++++++++-----------
 1 file changed, 66 insertions(+), 37 deletions(-)

diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs
index 195a19c..f426bc9 100644
--- a/datafusion/src/physical_plan/hash_join.rs
+++ b/datafusion/src/physical_plan/hash_join.rs
@@ -64,7 +64,7 @@ use super::{
     SendableRecordBatchStream,
 };
 use crate::physical_plan::coalesce_batches::concat_batches;
-use crate::physical_plan::PhysicalExpr;
+use crate::physical_plan::{PhysicalExpr, SQLMetric};
 use log::debug;
 
 // Maps a `u64` hash value based on the left ["on" values] to a list of indices with this key's value.
@@ -102,6 +102,35 @@ pub struct HashJoinExec {
     random_state: RandomState,
     /// Partitioning mode to use
     mode: PartitionMode,
+    /// Metrics
+    metrics: Arc<HashJoinMetrics>,
+}
+
+/// Metrics for HashJoinExec
+#[derive(Debug)]
+struct HashJoinMetrics {
+    /// Total time for joining probe-side batches to the build-side batches
+    join_time: Arc<SQLMetric>,
+    /// Number of batches consumed by this operator
+    input_batches: Arc<SQLMetric>,
+    /// Number of rows consumed by this operator
+    input_rows: Arc<SQLMetric>,
+    /// Number of batches produced by this operator
+    output_batches: Arc<SQLMetric>,
+    /// Number of rows produced by this operator
+    output_rows: Arc<SQLMetric>,
+}
+
+impl HashJoinMetrics {
+    fn new() -> Self {
+        Self {
+            join_time: SQLMetric::time_nanos(),
+            input_batches: SQLMetric::counter(),
+            input_rows: SQLMetric::counter(),
+            output_batches: SQLMetric::counter(),
+            output_rows: SQLMetric::counter(),
+        }
+    }
 }
 
 #[derive(Clone, Copy, Debug, PartialEq)]
@@ -154,6 +183,7 @@ impl HashJoinExec {
             build_side: Arc::new(Mutex::new(None)),
             random_state,
             mode: partition_mode,
+            metrics: Arc::new(HashJoinMetrics::new()),
         })
     }
 
@@ -394,6 +424,7 @@ impl ExecutionPlan for HashJoinExec {
             column_indices,
             self.random_state.clone(),
             visited_left_side,
+            self.metrics.clone(),
         )))
     }
 
@@ -412,6 +443,22 @@ impl ExecutionPlan for HashJoinExec {
             }
         }
     }
+
+    fn metrics(&self) -> HashMap<String, SQLMetric> {
+        let mut metrics = HashMap::new();
+        metrics.insert("joinTime".to_owned(), (*self.metrics.join_time).clone());
+        metrics.insert(
+            "inputBatches".to_owned(),
+            (*self.metrics.input_batches).clone(),
+        );
+        metrics.insert("inputRows".to_owned(), (*self.metrics.input_rows).clone());
+        metrics.insert(
+            "outputBatches".to_owned(),
+            (*self.metrics.output_batches).clone(),
+        );
+        metrics.insert("outputRows".to_owned(), (*self.metrics.output_rows).clone());
+        metrics
+    }
 }
 
 /// Updates `hash` with new entries from [RecordBatch] evaluated against the expressions `on`,
@@ -467,22 +514,14 @@ struct HashJoinStream {
     right: SendableRecordBatchStream,
     /// Information of index and left / right placement of columns
     column_indices: Vec<ColumnIndex>,
-    /// number of input batches
-    num_input_batches: usize,
-    /// number of input rows
-    num_input_rows: usize,
-    /// number of batches produced
-    num_output_batches: usize,
-    /// number of rows produced
-    num_output_rows: usize,
-    /// total time for joining probe-side batches to the build-side batches
-    join_time: usize,
     /// Random state used for hashing initialization
     random_state: RandomState,
     /// Keeps track of the left side rows whether they are visited
     visited_left_side: Vec<bool>, // TODO: use a more memory efficient data structure, https://github.com/apache/arrow-datafusion/issues/240
     /// There is nothing to process anymore and left side is processed in case of left join
     is_exhausted: bool,
+    /// Metrics
+    metrics: Arc<HashJoinMetrics>,
 }
 
 #[allow(clippy::too_many_arguments)]
@@ -497,6 +536,7 @@ impl HashJoinStream {
         column_indices: Vec<ColumnIndex>,
         random_state: RandomState,
         visited_left_side: Vec<bool>,
+        metrics: Arc<HashJoinMetrics>,
     ) -> Self {
         HashJoinStream {
             schema,
@@ -506,14 +546,10 @@ impl HashJoinStream {
             left_data,
             right,
             column_indices,
-            num_input_batches: 0,
-            num_input_rows: 0,
-            num_output_batches: 0,
-            num_output_rows: 0,
-            join_time: 0,
             random_state,
             visited_left_side,
             is_exhausted: false,
+            metrics,
         }
     }
 }
@@ -1215,12 +1251,14 @@ impl Stream for HashJoinStream {
                         &self.column_indices,
                         &self.random_state,
                     );
-                    self.num_input_batches += 1;
-                    self.num_input_rows += batch.num_rows();
+                    self.metrics.input_batches.add(1);
+                    self.metrics.input_rows.add(batch.num_rows());
                     if let Ok((ref batch, ref left_side)) = result {
-                        self.join_time += start.elapsed().as_millis() as usize;
-                        self.num_output_batches += 1;
-                        self.num_output_rows += batch.num_rows();
+                        self.metrics
+                            .join_time
+                            .add(start.elapsed().as_millis() as usize);
+                        self.metrics.output_batches.add(1);
+                        self.metrics.output_rows.add(batch.num_rows());
 
                         match self.join_type {
                             JoinType::Left
@@ -1254,13 +1292,14 @@ impl Stream for HashJoinStream {
                                 self.join_type != JoinType::Semi,
                             );
                             if let Ok(ref batch) = result {
-                                self.num_input_batches += 1;
-                                self.num_input_rows += batch.num_rows();
+                                self.metrics.input_batches.add(1);
+                                self.metrics.input_rows.add(batch.num_rows());
                                 if let Ok(ref batch) = result {
-                                    self.join_time +=
-                                        start.elapsed().as_millis() as usize;
-                                    self.num_output_batches += 1;
-                                    self.num_output_rows += batch.num_rows();
+                                    self.metrics
+                                        .join_time
+                                        .add(start.elapsed().as_millis() as usize);
+                                    self.metrics.output_batches.add(1);
+                                    self.metrics.output_rows.add(batch.num_rows());
                                 }
                             }
                             self.is_exhausted = true;
@@ -1274,16 +1313,6 @@ impl Stream for HashJoinStream {
                         | JoinType::Right => {}
                     }
 
-                    // End of right batch, print stats in debug mode
-                    debug!(
-                        "Processed {} probe-side input batches containing {} rows and \
-                        produced {} output batches containing {} rows in {} ms",
-                        self.num_input_batches,
-                        self.num_input_rows,
-                        self.num_output_batches,
-                        self.num_output_rows,
-                        self.join_time
-                    );
                     other
                 }
             })