You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ak...@apache.org on 2023/12/15 14:10:17 UTC

(arrow-datafusion) branch main updated: [MINOR]: Some code changes and a new empty batch guard for SHJ (#8557)

This is an automated email from the ASF dual-hosted git repository.

akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new bf0073c03a [MINOR]: Some code changes and a new empty batch guard for SHJ (#8557)
bf0073c03a is described below

commit bf0073c03ace1e4212f5895c529592d9925bf28d
Author: Metehan Yıldırım <10...@users.noreply.github.com>
AuthorDate: Fri Dec 15 16:10:12 2023 +0200

    [MINOR]: Some code changes and a new empty batch guard for SHJ (#8557)
    
    * minor changes
    
    * Fix imports
    
    ---------
    
    Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
---
 .../physical-plan/src/joins/stream_join_utils.rs   | 83 +++++++++++++++++++++-
 .../physical-plan/src/joins/symmetric_hash_join.rs | 64 +----------------
 2 files changed, 83 insertions(+), 64 deletions(-)

diff --git a/datafusion/physical-plan/src/joins/stream_join_utils.rs b/datafusion/physical-plan/src/joins/stream_join_utils.rs
index 5083f96b01..2f74bd1c4b 100644
--- a/datafusion/physical-plan/src/joins/stream_join_utils.rs
+++ b/datafusion/physical-plan/src/joins/stream_join_utils.rs
@@ -23,8 +23,9 @@ use std::sync::Arc;
 use std::task::{Context, Poll};
 use std::usize;
 
-use crate::handle_async_state;
 use crate::joins::utils::{JoinFilter, JoinHashMapType};
+use crate::metrics::{ExecutionPlanMetricsSet, MetricBuilder};
+use crate::{handle_async_state, metrics};
 
 use arrow::compute::concat_batches;
 use arrow_array::{ArrowPrimitiveType, NativeAdapter, PrimitiveArray, RecordBatch};
@@ -824,6 +825,10 @@ pub trait EagerJoinStream {
     ) -> Result<StreamJoinStateResult<Option<RecordBatch>>> {
         match self.right_stream().next().await {
             Some(Ok(batch)) => {
+                if batch.num_rows() == 0 {
+                    return Ok(StreamJoinStateResult::Continue);
+                }
+
                 self.set_state(EagerJoinStreamState::PullLeft);
                 self.process_batch_from_right(batch)
             }
@@ -849,6 +854,9 @@ pub trait EagerJoinStream {
     ) -> Result<StreamJoinStateResult<Option<RecordBatch>>> {
         match self.left_stream().next().await {
             Some(Ok(batch)) => {
+                if batch.num_rows() == 0 {
+                    return Ok(StreamJoinStateResult::Continue);
+                }
                 self.set_state(EagerJoinStreamState::PullRight);
                 self.process_batch_from_left(batch)
             }
@@ -874,7 +882,12 @@ pub trait EagerJoinStream {
         &mut self,
     ) -> Result<StreamJoinStateResult<Option<RecordBatch>>> {
         match self.left_stream().next().await {
-            Some(Ok(batch)) => self.process_batch_after_right_end(batch),
+            Some(Ok(batch)) => {
+                if batch.num_rows() == 0 {
+                    return Ok(StreamJoinStateResult::Continue);
+                }
+                self.process_batch_after_right_end(batch)
+            }
             Some(Err(e)) => Err(e),
             None => {
                 self.set_state(EagerJoinStreamState::BothExhausted {
@@ -899,7 +912,12 @@ pub trait EagerJoinStream {
         &mut self,
     ) -> Result<StreamJoinStateResult<Option<RecordBatch>>> {
         match self.right_stream().next().await {
-            Some(Ok(batch)) => self.process_batch_after_left_end(batch),
+            Some(Ok(batch)) => {
+                if batch.num_rows() == 0 {
+                    return Ok(StreamJoinStateResult::Continue);
+                }
+                self.process_batch_after_left_end(batch)
+            }
             Some(Err(e)) => Err(e),
             None => {
                 self.set_state(EagerJoinStreamState::BothExhausted {
@@ -1020,6 +1038,65 @@ pub trait EagerJoinStream {
     fn state(&mut self) -> EagerJoinStreamState;
 }
 
+#[derive(Debug)]
+pub struct StreamJoinSideMetrics {
+    /// Number of batches consumed by this operator
+    pub(crate) input_batches: metrics::Count,
+    /// Number of rows consumed by this operator
+    pub(crate) input_rows: metrics::Count,
+}
+
+/// Metrics for HashJoinExec
+#[derive(Debug)]
+pub struct StreamJoinMetrics {
+    /// Number of left batches/rows consumed by this operator
+    pub(crate) left: StreamJoinSideMetrics,
+    /// Number of right batches/rows consumed by this operator
+    pub(crate) right: StreamJoinSideMetrics,
+    /// Memory used by sides in bytes
+    pub(crate) stream_memory_usage: metrics::Gauge,
+    /// Number of batches produced by this operator
+    pub(crate) output_batches: metrics::Count,
+    /// Number of rows produced by this operator
+    pub(crate) output_rows: metrics::Count,
+}
+
+impl StreamJoinMetrics {
+    pub fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
+        let input_batches =
+            MetricBuilder::new(metrics).counter("input_batches", partition);
+        let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);
+        let left = StreamJoinSideMetrics {
+            input_batches,
+            input_rows,
+        };
+
+        let input_batches =
+            MetricBuilder::new(metrics).counter("input_batches", partition);
+        let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);
+        let right = StreamJoinSideMetrics {
+            input_batches,
+            input_rows,
+        };
+
+        let stream_memory_usage =
+            MetricBuilder::new(metrics).gauge("stream_memory_usage", partition);
+
+        let output_batches =
+            MetricBuilder::new(metrics).counter("output_batches", partition);
+
+        let output_rows = MetricBuilder::new(metrics).output_rows(partition);
+
+        Self {
+            left,
+            right,
+            output_batches,
+            stream_memory_usage,
+            output_rows,
+        }
+    }
+}
+
 #[cfg(test)]
 pub mod tests {
     use std::sync::Arc;
diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
index 95f15877b9..00a7f23eba 100644
--- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
+++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
@@ -37,7 +37,8 @@ use crate::joins::stream_join_utils::{
     calculate_filter_expr_intervals, combine_two_batches,
     convert_sort_expr_with_filter_schema, get_pruning_anti_indices,
     get_pruning_semi_indices, record_visited_indices, EagerJoinStream,
-    EagerJoinStreamState, PruningJoinHashMap, SortedFilterExpr, StreamJoinStateResult,
+    EagerJoinStreamState, PruningJoinHashMap, SortedFilterExpr, StreamJoinMetrics,
+    StreamJoinStateResult,
 };
 use crate::joins::utils::{
     build_batch_from_indices, build_join_schema, check_join_is_valid,
@@ -47,7 +48,7 @@ use crate::joins::utils::{
 use crate::{
     expressions::{Column, PhysicalSortExpr},
     joins::StreamJoinPartitionMode,
-    metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
+    metrics::{ExecutionPlanMetricsSet, MetricsSet},
     DisplayAs, DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan,
     Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics,
 };
@@ -184,65 +185,6 @@ pub struct SymmetricHashJoinExec {
     mode: StreamJoinPartitionMode,
 }
 
-#[derive(Debug)]
-pub struct StreamJoinSideMetrics {
-    /// Number of batches consumed by this operator
-    pub(crate) input_batches: metrics::Count,
-    /// Number of rows consumed by this operator
-    pub(crate) input_rows: metrics::Count,
-}
-
-/// Metrics for HashJoinExec
-#[derive(Debug)]
-pub struct StreamJoinMetrics {
-    /// Number of left batches/rows consumed by this operator
-    pub(crate) left: StreamJoinSideMetrics,
-    /// Number of right batches/rows consumed by this operator
-    pub(crate) right: StreamJoinSideMetrics,
-    /// Memory used by sides in bytes
-    pub(crate) stream_memory_usage: metrics::Gauge,
-    /// Number of batches produced by this operator
-    pub(crate) output_batches: metrics::Count,
-    /// Number of rows produced by this operator
-    pub(crate) output_rows: metrics::Count,
-}
-
-impl StreamJoinMetrics {
-    pub fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
-        let input_batches =
-            MetricBuilder::new(metrics).counter("input_batches", partition);
-        let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);
-        let left = StreamJoinSideMetrics {
-            input_batches,
-            input_rows,
-        };
-
-        let input_batches =
-            MetricBuilder::new(metrics).counter("input_batches", partition);
-        let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);
-        let right = StreamJoinSideMetrics {
-            input_batches,
-            input_rows,
-        };
-
-        let stream_memory_usage =
-            MetricBuilder::new(metrics).gauge("stream_memory_usage", partition);
-
-        let output_batches =
-            MetricBuilder::new(metrics).counter("output_batches", partition);
-
-        let output_rows = MetricBuilder::new(metrics).output_rows(partition);
-
-        Self {
-            left,
-            right,
-            output_batches,
-            stream_memory_usage,
-            output_rows,
-        }
-    }
-}
-
 impl SymmetricHashJoinExec {
     /// Tries to create a new [SymmetricHashJoinExec].
     /// # Error