You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ak...@apache.org on 2023/12/15 14:10:17 UTC
(arrow-datafusion) branch main updated: [MINOR]: Some code changes and a new empty batch guard for SHJ (#8557)
This is an automated email from the ASF dual-hosted git repository.
akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new bf0073c03a [MINOR]: Some code changes and a new empty batch guard for SHJ (#8557)
bf0073c03a is described below
commit bf0073c03ace1e4212f5895c529592d9925bf28d
Author: Metehan Yıldırım <10...@users.noreply.github.com>
AuthorDate: Fri Dec 15 16:10:12 2023 +0200
[MINOR]: Some code changes and a new empty batch guard for SHJ (#8557)
* minor changes
* Fix imports
---------
Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
---
.../physical-plan/src/joins/stream_join_utils.rs | 83 +++++++++++++++++++++-
.../physical-plan/src/joins/symmetric_hash_join.rs | 64 +----------------
2 files changed, 83 insertions(+), 64 deletions(-)
diff --git a/datafusion/physical-plan/src/joins/stream_join_utils.rs b/datafusion/physical-plan/src/joins/stream_join_utils.rs
index 5083f96b01..2f74bd1c4b 100644
--- a/datafusion/physical-plan/src/joins/stream_join_utils.rs
+++ b/datafusion/physical-plan/src/joins/stream_join_utils.rs
@@ -23,8 +23,9 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use std::usize;
-use crate::handle_async_state;
use crate::joins::utils::{JoinFilter, JoinHashMapType};
+use crate::metrics::{ExecutionPlanMetricsSet, MetricBuilder};
+use crate::{handle_async_state, metrics};
use arrow::compute::concat_batches;
use arrow_array::{ArrowPrimitiveType, NativeAdapter, PrimitiveArray, RecordBatch};
@@ -824,6 +825,10 @@ pub trait EagerJoinStream {
) -> Result<StreamJoinStateResult<Option<RecordBatch>>> {
match self.right_stream().next().await {
Some(Ok(batch)) => {
+ if batch.num_rows() == 0 {
+ return Ok(StreamJoinStateResult::Continue);
+ }
+
self.set_state(EagerJoinStreamState::PullLeft);
self.process_batch_from_right(batch)
}
@@ -849,6 +854,9 @@ pub trait EagerJoinStream {
) -> Result<StreamJoinStateResult<Option<RecordBatch>>> {
match self.left_stream().next().await {
Some(Ok(batch)) => {
+ if batch.num_rows() == 0 {
+ return Ok(StreamJoinStateResult::Continue);
+ }
self.set_state(EagerJoinStreamState::PullRight);
self.process_batch_from_left(batch)
}
@@ -874,7 +882,12 @@ pub trait EagerJoinStream {
&mut self,
) -> Result<StreamJoinStateResult<Option<RecordBatch>>> {
match self.left_stream().next().await {
- Some(Ok(batch)) => self.process_batch_after_right_end(batch),
+ Some(Ok(batch)) => {
+ if batch.num_rows() == 0 {
+ return Ok(StreamJoinStateResult::Continue);
+ }
+ self.process_batch_after_right_end(batch)
+ }
Some(Err(e)) => Err(e),
None => {
self.set_state(EagerJoinStreamState::BothExhausted {
@@ -899,7 +912,12 @@ pub trait EagerJoinStream {
&mut self,
) -> Result<StreamJoinStateResult<Option<RecordBatch>>> {
match self.right_stream().next().await {
- Some(Ok(batch)) => self.process_batch_after_left_end(batch),
+ Some(Ok(batch)) => {
+ if batch.num_rows() == 0 {
+ return Ok(StreamJoinStateResult::Continue);
+ }
+ self.process_batch_after_left_end(batch)
+ }
Some(Err(e)) => Err(e),
None => {
self.set_state(EagerJoinStreamState::BothExhausted {
@@ -1020,6 +1038,65 @@ pub trait EagerJoinStream {
fn state(&mut self) -> EagerJoinStreamState;
}
+#[derive(Debug)]
+pub struct StreamJoinSideMetrics {
+ /// Number of batches consumed by this operator
+ pub(crate) input_batches: metrics::Count,
+ /// Number of rows consumed by this operator
+ pub(crate) input_rows: metrics::Count,
+}
+
+/// Metrics for HashJoinExec
+#[derive(Debug)]
+pub struct StreamJoinMetrics {
+ /// Number of left batches/rows consumed by this operator
+ pub(crate) left: StreamJoinSideMetrics,
+ /// Number of right batches/rows consumed by this operator
+ pub(crate) right: StreamJoinSideMetrics,
+ /// Memory used by sides in bytes
+ pub(crate) stream_memory_usage: metrics::Gauge,
+ /// Number of batches produced by this operator
+ pub(crate) output_batches: metrics::Count,
+ /// Number of rows produced by this operator
+ pub(crate) output_rows: metrics::Count,
+}
+
+impl StreamJoinMetrics {
+ pub fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
+ let input_batches =
+ MetricBuilder::new(metrics).counter("input_batches", partition);
+ let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);
+ let left = StreamJoinSideMetrics {
+ input_batches,
+ input_rows,
+ };
+
+ let input_batches =
+ MetricBuilder::new(metrics).counter("input_batches", partition);
+ let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);
+ let right = StreamJoinSideMetrics {
+ input_batches,
+ input_rows,
+ };
+
+ let stream_memory_usage =
+ MetricBuilder::new(metrics).gauge("stream_memory_usage", partition);
+
+ let output_batches =
+ MetricBuilder::new(metrics).counter("output_batches", partition);
+
+ let output_rows = MetricBuilder::new(metrics).output_rows(partition);
+
+ Self {
+ left,
+ right,
+ output_batches,
+ stream_memory_usage,
+ output_rows,
+ }
+ }
+}
+
#[cfg(test)]
pub mod tests {
use std::sync::Arc;
diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
index 95f15877b9..00a7f23eba 100644
--- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
+++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
@@ -37,7 +37,8 @@ use crate::joins::stream_join_utils::{
calculate_filter_expr_intervals, combine_two_batches,
convert_sort_expr_with_filter_schema, get_pruning_anti_indices,
get_pruning_semi_indices, record_visited_indices, EagerJoinStream,
- EagerJoinStreamState, PruningJoinHashMap, SortedFilterExpr, StreamJoinStateResult,
+ EagerJoinStreamState, PruningJoinHashMap, SortedFilterExpr, StreamJoinMetrics,
+ StreamJoinStateResult,
};
use crate::joins::utils::{
build_batch_from_indices, build_join_schema, check_join_is_valid,
@@ -47,7 +48,7 @@ use crate::joins::utils::{
use crate::{
expressions::{Column, PhysicalSortExpr},
joins::StreamJoinPartitionMode,
- metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
+ metrics::{ExecutionPlanMetricsSet, MetricsSet},
DisplayAs, DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan,
Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics,
};
@@ -184,65 +185,6 @@ pub struct SymmetricHashJoinExec {
mode: StreamJoinPartitionMode,
}
-#[derive(Debug)]
-pub struct StreamJoinSideMetrics {
- /// Number of batches consumed by this operator
- pub(crate) input_batches: metrics::Count,
- /// Number of rows consumed by this operator
- pub(crate) input_rows: metrics::Count,
-}
-
-/// Metrics for HashJoinExec
-#[derive(Debug)]
-pub struct StreamJoinMetrics {
- /// Number of left batches/rows consumed by this operator
- pub(crate) left: StreamJoinSideMetrics,
- /// Number of right batches/rows consumed by this operator
- pub(crate) right: StreamJoinSideMetrics,
- /// Memory used by sides in bytes
- pub(crate) stream_memory_usage: metrics::Gauge,
- /// Number of batches produced by this operator
- pub(crate) output_batches: metrics::Count,
- /// Number of rows produced by this operator
- pub(crate) output_rows: metrics::Count,
-}
-
-impl StreamJoinMetrics {
- pub fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
- let input_batches =
- MetricBuilder::new(metrics).counter("input_batches", partition);
- let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);
- let left = StreamJoinSideMetrics {
- input_batches,
- input_rows,
- };
-
- let input_batches =
- MetricBuilder::new(metrics).counter("input_batches", partition);
- let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);
- let right = StreamJoinSideMetrics {
- input_batches,
- input_rows,
- };
-
- let stream_memory_usage =
- MetricBuilder::new(metrics).gauge("stream_memory_usage", partition);
-
- let output_batches =
- MetricBuilder::new(metrics).counter("output_batches", partition);
-
- let output_rows = MetricBuilder::new(metrics).output_rows(partition);
-
- Self {
- left,
- right,
- output_batches,
- stream_memory_usage,
- output_rows,
- }
- }
-}
-
impl SymmetricHashJoinExec {
/// Tries to create a new [SymmetricHashJoinExec].
/// # Error