You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by vi...@apache.org on 2024/02/07 16:48:29 UTC
(arrow-datafusion) branch main updated: Support join filter for `SortMergeJoin` (#9080)
This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 13fdf89ad7 Support join filter for `SortMergeJoin` (#9080)
13fdf89ad7 is described below
commit 13fdf89ad75f46c0887712a410080f11b56988ef
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Wed Feb 7 08:48:23 2024 -0800
Support join filter for `SortMergeJoin` (#9080)
* Support join filter for SortMergeJoin
* Move test
* Fix test
* Fix clippy
* Add outer join tests
* Fix outer join
* For review
* Update datafusion/physical-plan/src/joins/sort_merge_join.rs
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---------
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
.../src/physical_optimizer/enforce_distribution.rs | 5 +
.../src/physical_optimizer/projection_pushdown.rs | 1 +
.../core/src/physical_optimizer/test_utils.rs | 1 +
datafusion/core/src/physical_planner.rs | 26 +-
datafusion/core/tests/fuzz_cases/join_fuzz.rs | 1 +
.../physical-plan/src/joins/sort_merge_join.rs | 219 ++++++++++++++++-
datafusion/sqllogictest/test_files/join.slt | 21 ++
.../sqllogictest/test_files/sort_merge_join.slt | 267 +++++++++++++++++++++
8 files changed, 515 insertions(+), 26 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index fab26c49c2..4f8806a685 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -342,6 +342,7 @@ fn adjust_input_keys_ordering(
left,
right,
on,
+ filter,
join_type,
sort_options,
null_equals_null,
@@ -356,6 +357,7 @@ fn adjust_input_keys_ordering(
left.clone(),
right.clone(),
new_conditions.0,
+ filter.clone(),
*join_type,
new_conditions.1,
*null_equals_null,
@@ -635,6 +637,7 @@ pub(crate) fn reorder_join_keys_to_inputs(
left,
right,
on,
+ filter,
join_type,
sort_options,
null_equals_null,
@@ -664,6 +667,7 @@ pub(crate) fn reorder_join_keys_to_inputs(
left.clone(),
right.clone(),
new_join_on,
+ filter.clone(),
*join_type,
new_sort_options,
*null_equals_null,
@@ -1642,6 +1646,7 @@ pub(crate) mod tests {
left,
right,
join_on.clone(),
+ None,
*join_type,
vec![SortOptions::default(); join_on.len()],
false,
diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
index e638f4a9a8..437d63dad2 100644
--- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
@@ -736,6 +736,7 @@ fn try_swapping_with_sort_merge_join(
Arc::new(new_left),
Arc::new(new_right),
new_on,
+ sm_join.filter.clone(),
sm_join.join_type,
sm_join.sort_options.clone(),
sm_join.null_equals_null,
diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs b/datafusion/core/src/physical_optimizer/test_utils.rs
index 5de6cff0b4..ca7fb78d21 100644
--- a/datafusion/core/src/physical_optimizer/test_utils.rs
+++ b/datafusion/core/src/physical_optimizer/test_utils.rs
@@ -175,6 +175,7 @@ pub fn sort_merge_join_exec(
left,
right,
join_on.clone(),
+ None,
*join_type,
vec![SortOptions::default(); join_on.len()],
false,
diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs
index 71be8ec7e8..463d0cde82 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -1114,6 +1114,7 @@ impl DefaultPhysicalPlanner {
};
let prefer_hash_join = session_state.config_options().optimizer.prefer_hash_join;
+
if join_on.is_empty() {
// there is no equal join condition, use the nested loop join
// TODO optimize the plan, and use the config of `target_partitions` and `repartition_joins`
@@ -1129,20 +1130,17 @@ impl DefaultPhysicalPlanner {
{
// Use SortMergeJoin if hash join is not preferred
// Sort-Merge join support currently is experimental
- if join_filter.is_some() {
- // TODO SortMergeJoinExec need to support join filter
- not_impl_err!("SortMergeJoinExec does not support join_filter now.")
- } else {
- let join_on_len = join_on.len();
- Ok(Arc::new(SortMergeJoinExec::try_new(
- physical_left,
- physical_right,
- join_on,
- *join_type,
- vec![SortOptions::default(); join_on_len],
- null_equals_null,
- )?))
- }
+
+ let join_on_len = join_on.len();
+ Ok(Arc::new(SortMergeJoinExec::try_new(
+ physical_left,
+ physical_right,
+ join_on,
+ join_filter,
+ *join_type,
+ vec![SortOptions::default(); join_on_len],
+ null_equals_null,
+ )?))
} else if session_state.config().target_partitions() > 1
&& session_state.config().repartition_joins()
&& prefer_hash_join {
diff --git a/datafusion/core/tests/fuzz_cases/join_fuzz.rs b/datafusion/core/tests/fuzz_cases/join_fuzz.rs
index 1c819ac466..78f8ee7723 100644
--- a/datafusion/core/tests/fuzz_cases/join_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/join_fuzz.rs
@@ -130,6 +130,7 @@ async fn run_join_test(
left,
right,
on_columns.clone(),
+ None,
join_type,
vec![SortOptions::default(), SortOptions::default()],
false,
diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs
index 675e90fb63..107fd7dde0 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs
@@ -33,7 +33,7 @@ use std::task::{Context, Poll};
use crate::expressions::PhysicalSortExpr;
use crate::joins::utils::{
build_join_schema, calculate_join_output_ordering, check_join_is_valid,
- estimate_join_statistics, partitioned_join_output_partitioning, JoinOn,
+ estimate_join_statistics, partitioned_join_output_partitioning, JoinFilter, JoinOn,
};
use crate::metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
use crate::{
@@ -42,6 +42,7 @@ use crate::{
};
use arrow::array::*;
+use arrow::compute;
use arrow::compute::{concat_batches, take, SortOptions};
use arrow::datatypes::{DataType, SchemaRef, TimeUnit};
use arrow::error::ArrowError;
@@ -68,6 +69,8 @@ pub struct SortMergeJoinExec {
pub right: Arc<dyn ExecutionPlan>,
/// Set of common columns used to join on
pub on: JoinOn,
+ /// Filters which are applied while finding matching rows
+ pub filter: Option<JoinFilter>,
/// How the join is performed
pub join_type: JoinType,
/// The schema once the join is applied
@@ -95,6 +98,7 @@ impl SortMergeJoinExec {
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
on: JoinOn,
+ filter: Option<JoinFilter>,
join_type: JoinType,
sort_options: Vec<SortOptions>,
null_equals_null: bool,
@@ -150,6 +154,7 @@ impl SortMergeJoinExec {
left,
right,
on,
+ filter,
join_type,
schema,
metrics: ExecutionPlanMetricsSet::new(),
@@ -210,6 +215,11 @@ impl SortMergeJoinExec {
impl DisplayAs for SortMergeJoinExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
+ let display_filter = self.filter.as_ref().map_or_else(
+ || "".to_string(),
+ |f| format!(", filter={}", f.expression()),
+ );
+
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
let on = self
@@ -220,8 +230,8 @@ impl DisplayAs for SortMergeJoinExec {
.join(", ");
write!(
f,
- "SortMergeJoin: join_type={:?}, on=[{}]",
- self.join_type, on
+ "SortMergeJoin: join_type={:?}, on=[{}]{}",
+ self.join_type, on, display_filter
)
}
}
@@ -300,6 +310,7 @@ impl ExecutionPlan for SortMergeJoinExec {
left.clone(),
right.clone(),
self.on.clone(),
+ self.filter.clone(),
self.join_type,
self.sort_options.clone(),
self.null_equals_null,
@@ -349,6 +360,7 @@ impl ExecutionPlan for SortMergeJoinExec {
buffered,
on_streamed,
on_buffered,
+ self.filter.clone(),
self.join_type,
batch_size,
SortMergeJoinMetrics::new(partition, &self.metrics),
@@ -456,8 +468,9 @@ enum BufferedState {
Exhausted,
}
+/// Represents a chunk of joined data from streamed and buffered side
struct StreamedJoinedChunk {
- /// Index of batch buffered_data
+ /// Index of batch in buffered_data
buffered_batch_idx: Option<usize>,
/// Array builder for streamed indices
streamed_indices: UInt64Builder,
@@ -466,13 +479,17 @@ struct StreamedJoinedChunk {
}
struct StreamedBatch {
+ /// The streamed record batch
pub batch: RecordBatch,
+ /// The index of row in the streamed batch to compare with buffered batches
pub idx: usize,
+ /// The join key arrays of streamed batch which are used to compare with buffered batches
+ /// and to produce output. They are produced by evaluating `on` expressions.
pub join_arrays: Vec<ArrayRef>,
- // Chunks of indices from buffered side (may be nulls) joined to streamed
+ /// Chunks of indices from buffered side (may be nulls) joined to streamed
pub output_indices: Vec<StreamedJoinedChunk>,
- // Index of currently scanned batch from buffered data
+ /// Index of currently scanned batch from buffered data
pub buffered_batch_idx: Option<usize>,
}
@@ -505,6 +522,8 @@ impl StreamedBatch {
buffered_batch_idx: Option<usize>,
buffered_idx: Option<usize>,
) {
+ // If no current chunk exists or current chunk is not for current buffered batch,
+ // create a new chunk
if self.output_indices.is_empty() || self.buffered_batch_idx != buffered_batch_idx
{
self.output_indices.push(StreamedJoinedChunk {
@@ -516,6 +535,7 @@ impl StreamedBatch {
};
let current_chunk = self.output_indices.last_mut().unwrap();
+ // Append index of streamed batch and index of buffered batch into current chunk
current_chunk.streamed_indices.append_value(self.idx as u64);
if let Some(idx) = buffered_idx {
current_chunk.buffered_indices.append_value(idx as u64);
@@ -610,9 +630,13 @@ struct SMJStream {
pub on_streamed: Vec<PhysicalExprRef>,
/// Join key columns of buffered
pub on_buffered: Vec<PhysicalExprRef>,
+ /// optional join filter
+ pub filter: Option<JoinFilter>,
/// Staging output array builders
pub output_record_batches: Vec<RecordBatch>,
- /// Staging output size, including output batches and staging joined results
+ /// Staging output size, including output batches and staging joined results.
+ /// Increased when we put rows into buffer and decreased after we actually output batches.
+ /// Used to trigger output when sufficient rows are ready
pub output_size: usize,
/// Target output batch size
pub batch_size: usize,
@@ -736,6 +760,7 @@ impl SMJStream {
buffered: SendableRecordBatchStream,
on_streamed: Vec<Arc<dyn PhysicalExpr>>,
on_buffered: Vec<Arc<dyn PhysicalExpr>>,
+ filter: Option<JoinFilter>,
join_type: JoinType,
batch_size: usize,
join_metrics: SortMergeJoinMetrics,
@@ -761,6 +786,7 @@ impl SMJStream {
current_ordering: Ordering::Equal,
on_streamed,
on_buffered,
+ filter,
output_record_batches: vec![],
output_size: 0,
batch_size,
@@ -943,7 +969,9 @@ impl SMJStream {
/// Produce join and fill output buffer until reaching target batch size
/// or the join is finished
fn join_partial(&mut self) -> Result<()> {
+ // Whether to join streamed rows
let mut join_streamed = false;
+ // Whether to join buffered rows
let mut join_buffered = false;
// determine whether we need to join streamed/buffered rows
@@ -991,11 +1019,13 @@ impl SMJStream {
{
let scanning_idx = self.buffered_data.scanning_idx();
if join_streamed {
+ // Join streamed row and buffered row
self.streamed_batch.append_output_pair(
Some(self.buffered_data.scanning_batch_idx),
Some(scanning_idx),
);
} else {
+ // Join nulls and buffered row
self.buffered_data
.scanning_batch_mut()
.null_joined
@@ -1059,6 +1089,7 @@ impl SMJStream {
}
buffered_batch.null_joined.clear();
+ // Take buffered (right) columns
let buffered_columns = buffered_batch
.batch
.columns()
@@ -1067,6 +1098,7 @@ impl SMJStream {
.collect::<Result<Vec<_>, ArrowError>>()
.map_err(Into::<DataFusionError>::into)?;
+ // Create null streamed (left) columns
let mut streamed_columns = self
.streamed_schema
.fields()
@@ -1121,16 +1153,141 @@ impl SMJStream {
.collect::<Vec<_>>()
};
+ let streamed_columns_length = streamed_columns.len();
+ let buffered_columns_length = buffered_columns.len();
+
+ // Prepare the columns we apply join filter on later.
+ // Only for joined rows between streamed and buffered.
+ let filter_columns = if chunk.buffered_batch_idx.is_some() {
+ if matches!(self.join_type, JoinType::Right) {
+ get_filter_column(&self.filter, &buffered_columns, &streamed_columns)
+ } else {
+ get_filter_column(&self.filter, &streamed_columns, &buffered_columns)
+ }
+ } else {
+ // This chunk is for null joined rows (outer join), we don't need to apply join filter.
+ vec![]
+ };
+
let columns = if matches!(self.join_type, JoinType::Right) {
- buffered_columns.extend(streamed_columns);
+ buffered_columns.extend(streamed_columns.clone());
buffered_columns
} else {
streamed_columns.extend(buffered_columns);
streamed_columns
};
- self.output_record_batches
- .push(RecordBatch::try_new(self.schema.clone(), columns)?);
+ let output_batch =
+ RecordBatch::try_new(self.schema.clone(), columns.clone())?;
+
+ // Apply join filter if any
+ if !filter_columns.is_empty() {
+ if let Some(f) = &self.filter {
+ // Construct batch with only filter columns
+ let filter_batch = RecordBatch::try_new(
+ Arc::new(f.schema().clone()),
+ filter_columns,
+ )?;
+
+ let filter_result = f
+ .expression()
+ .evaluate(&filter_batch)?
+ .into_array(filter_batch.num_rows())?;
+
+ // The selection mask of the filter
+ let mask = datafusion_common::cast::as_boolean_array(&filter_result)?;
+
+ // Push the filtered batch to the output
+ let filtered_batch =
+ compute::filter_record_batch(&output_batch, mask)?;
+ self.output_record_batches.push(filtered_batch);
+
+ // For outer joins, we need to push the null joined rows to the output.
+ if matches!(
+ self.join_type,
+ JoinType::Left | JoinType::Right | JoinType::Full
+ ) {
+ // The reverse of the selection mask. For the rows not pass join filter above,
+ // we need to join them (left or right) with null rows for outer joins.
+ let not_mask = compute::not(mask)?;
+ let null_joined_batch =
+ compute::filter_record_batch(&output_batch, ¬_mask)?;
+
+ let mut buffered_columns = self
+ .buffered_schema
+ .fields()
+ .iter()
+ .map(|f| {
+ new_null_array(
+ f.data_type(),
+ null_joined_batch.num_rows(),
+ )
+ })
+ .collect::<Vec<_>>();
+
+ let columns = if matches!(self.join_type, JoinType::Right) {
+ let streamed_columns = null_joined_batch
+ .columns()
+ .iter()
+ .skip(buffered_columns_length)
+ .cloned()
+ .collect::<Vec<_>>();
+
+ buffered_columns.extend(streamed_columns);
+ buffered_columns
+ } else {
+ // Left join or full outer join
+ let mut streamed_columns = null_joined_batch
+ .columns()
+ .iter()
+ .take(streamed_columns_length)
+ .cloned()
+ .collect::<Vec<_>>();
+
+ streamed_columns.extend(buffered_columns);
+ streamed_columns
+ };
+
+ let null_joined_streamed_batch =
+ RecordBatch::try_new(self.schema.clone(), columns.clone())?;
+ self.output_record_batches.push(null_joined_streamed_batch);
+
+ // For full join, we also need to output the null joined rows from the buffered side
+ if matches!(self.join_type, JoinType::Full) {
+ let mut streamed_columns = self
+ .streamed_schema
+ .fields()
+ .iter()
+ .map(|f| {
+ new_null_array(
+ f.data_type(),
+ null_joined_batch.num_rows(),
+ )
+ })
+ .collect::<Vec<_>>();
+
+ let buffered_columns = null_joined_batch
+ .columns()
+ .iter()
+ .skip(streamed_columns_length)
+ .cloned()
+ .collect::<Vec<_>>();
+
+ streamed_columns.extend(buffered_columns);
+
+ let null_joined_buffered_batch = RecordBatch::try_new(
+ self.schema.clone(),
+ streamed_columns,
+ )?;
+ self.output_record_batches.push(null_joined_buffered_batch);
+ }
+ }
+ } else {
+ self.output_record_batches.push(output_batch);
+ }
+ } else {
+ self.output_record_batches.push(output_batch);
+ }
}
self.streamed_batch.output_indices.clear();
@@ -1142,12 +1299,49 @@ impl SMJStream {
let record_batch = concat_batches(&self.schema, &self.output_record_batches)?;
self.join_metrics.output_batches.add(1);
self.join_metrics.output_rows.add(record_batch.num_rows());
- self.output_size -= record_batch.num_rows();
+ // If join filter exists, `self.output_size` is not accurate as we don't know the exact
+ // number of rows in the output record batch. If streamed row joined with buffered rows,
+ // once join filter is applied, the number of output rows may be more than 1.
+ if record_batch.num_rows() > self.output_size {
+ self.output_size = 0;
+ } else {
+ self.output_size -= record_batch.num_rows();
+ }
self.output_record_batches.clear();
Ok(record_batch)
}
}
+/// Gets the arrays which join filters are applied on.
+fn get_filter_column(
+ join_filter: &Option<JoinFilter>,
+ streamed_columns: &[ArrayRef],
+ buffered_columns: &[ArrayRef],
+) -> Vec<ArrayRef> {
+ let mut filter_columns = vec![];
+
+ if let Some(f) = join_filter {
+ let left_columns = f
+ .column_indices()
+ .iter()
+ .filter(|col_index| col_index.side == JoinSide::Left)
+ .map(|i| streamed_columns[i.index].clone())
+ .collect::<Vec<_>>();
+
+ let right_columns = f
+ .column_indices()
+ .iter()
+ .filter(|col_index| col_index.side == JoinSide::Right)
+ .map(|i| buffered_columns[i.index].clone())
+ .collect::<Vec<_>>();
+
+ filter_columns.extend(left_columns);
+ filter_columns.extend(right_columns);
+ }
+
+ filter_columns
+}
+
/// Buffered data contains all buffered batches with one unique join key
#[derive(Debug, Default)]
struct BufferedData {
@@ -1498,7 +1692,7 @@ mod tests {
join_type: JoinType,
) -> Result<SortMergeJoinExec> {
let sort_options = vec![SortOptions::default(); on.len()];
- SortMergeJoinExec::try_new(left, right, on, join_type, sort_options, false)
+ SortMergeJoinExec::try_new(left, right, on, None, join_type, sort_options, false)
}
fn join_with_options(
@@ -1513,6 +1707,7 @@ mod tests {
left,
right,
on,
+ None,
join_type,
sort_options,
null_equals_null,
diff --git a/datafusion/sqllogictest/test_files/join.slt b/datafusion/sqllogictest/test_files/join.slt
index e5cbf31c83..a162bf0632 100644
--- a/datafusion/sqllogictest/test_files/join.slt
+++ b/datafusion/sqllogictest/test_files/join.slt
@@ -238,6 +238,27 @@ SELECT t1_int, t2_int, t2_id FROM t1 RIGHT JOIN t2 ON t1_id = t2_id AND t2_int <
NULL 3 11
NULL 3 55
+# equijoin_full
+query ITIITI rowsort
+SELECT * FROM t1 FULL JOIN t2 ON t1_id = t2_id
+----
+11 a 1 11 z 3
+22 b 2 22 y 1
+33 c 3 NULL NULL NULL
+44 d 4 44 x 3
+NULL NULL NULL 55 w 3
+
+# equijoin_full_and_condition_from_both
+query ITIITI rowsort
+SELECT * FROM t1 FULL JOIN t2 ON t1_id = t2_id AND t2_int <= t1_int
+----
+11 a 1 NULL NULL NULL
+22 b 2 22 y 1
+33 c 3 NULL NULL NULL
+44 d 4 44 x 3
+NULL NULL NULL 11 z 3
+NULL NULL NULL 55 w 3
+
# left_join
query ITT rowsort
SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id
diff --git a/datafusion/sqllogictest/test_files/sort_merge_join.slt b/datafusion/sqllogictest/test_files/sort_merge_join.slt
new file mode 100644
index 0000000000..426b9a3a52
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/sort_merge_join.slt
@@ -0,0 +1,267 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+##########
+## Sort Merge Join Tests
+##########
+
+statement ok
+set datafusion.optimizer.prefer_hash_join = false;
+
+statement ok
+CREATE TABLE t1(a text, b int) AS VALUES ('Alice', 50), ('Alice', 100), ('Bob', 1);
+
+statement ok
+CREATE TABLE t2(a text, b int) AS VALUES ('Alice', 2), ('Alice', 1);
+
+# inner join query plan with join filter
+query TT
+EXPLAIN SELECT t1.a, t1.b, t2.a, t2.b FROM t1 JOIN t2 ON t1.a = t2.a AND t2.b * 50 <= t1.b
+----
+logical_plan
+Inner Join: t1.a = t2.a Filter: CAST(t2.b AS Int64) * Int64(50) <= CAST(t1.b AS Int64)
+--TableScan: t1 projection=[a, b]
+--TableScan: t2 projection=[a, b]
+physical_plan
+SortMergeJoin: join_type=Inner, on=[(a@0, a@0)], filter=CAST(b@1 AS Int64) * 50 <= CAST(b@0 AS Int64)
+--SortExec: expr=[a@0 ASC]
+----CoalesceBatchesExec: target_batch_size=8192
+------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1
+--------MemoryExec: partitions=1, partition_sizes=[1]
+--SortExec: expr=[a@0 ASC]
+----CoalesceBatchesExec: target_batch_size=8192
+------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1
+--------MemoryExec: partitions=1, partition_sizes=[1]
+
+# inner join with join filter
+query TITI rowsort
+SELECT t1.a, t1.b, t2.a, t2.b FROM t1 JOIN t2 ON t1.a = t2.a AND t2.b * 50 <= t1.b
+----
+Alice 100 Alice 1
+Alice 100 Alice 2
+Alice 50 Alice 1
+
+query TITI rowsort
+SELECT t1.a, t1.b, t2.a, t2.b FROM t1 JOIN t2 ON t1.a = t2.a AND t2.b < t1.b
+----
+Alice 100 Alice 1
+Alice 100 Alice 2
+Alice 50 Alice 1
+Alice 50 Alice 2
+
+query TITI rowsort
+SELECT t1.a, t1.b, t2.a, t2.b FROM t1 JOIN t2 ON t1.a = t2.a AND t2.b > t1.b
+----
+
+# left join without join filter
+query TITI rowsort
+SELECT * FROM t1 LEFT JOIN t2 ON t1.a = t2.a
+----
+Alice 100 Alice 1
+Alice 100 Alice 2
+Alice 50 Alice 1
+Alice 50 Alice 2
+Bob 1 NULL NULL
+
+# left join with join filter
+query TITI rowsort
+SELECT * FROM t1 LEFT JOIN t2 ON t1.a = t2.a AND t2.b * 50 <= t1.b
+----
+Alice 100 Alice 1
+Alice 100 Alice 2
+Alice 50 Alice 1
+Alice 50 NULL NULL
+Bob 1 NULL NULL
+
+query TITI rowsort
+SELECT * FROM t1 LEFT JOIN t2 ON t1.a = t2.a AND t2.b < t1.b
+----
+Alice 100 Alice 1
+Alice 100 Alice 2
+Alice 50 Alice 1
+Alice 50 Alice 2
+Bob 1 NULL NULL
+
+# right join without join filter
+query TITI rowsort
+SELECT * FROM t1 RIGHT JOIN t2 ON t1.a = t2.a
+----
+Alice 100 Alice 1
+Alice 100 Alice 2
+Alice 50 Alice 1
+Alice 50 Alice 2
+
+# right join with join filter
+query TITI rowsort
+SELECT * FROM t1 RIGHT JOIN t2 ON t1.a = t2.a AND t2.b * 50 <= t1.b
+----
+Alice 100 Alice 1
+Alice 100 Alice 2
+Alice 50 Alice 1
+NULL NULL Alice 2
+
+query TITI rowsort
+SELECT * FROM t1 RIGHT JOIN t2 ON t1.a = t2.a AND t1.b > t2.b
+----
+Alice 100 Alice 1
+Alice 100 Alice 2
+Alice 50 Alice 1
+Alice 50 Alice 2
+
+# full join without join filter
+query TITI rowsort
+SELECT * FROM t1 FULL JOIN t2 ON t1.a = t2.a
+----
+Alice 100 Alice 1
+Alice 100 Alice 2
+Alice 50 Alice 1
+Alice 50 Alice 2
+Bob 1 NULL NULL
+
+# full join with join filter
+query TITI rowsort
+SELECT * FROM t1 FULL JOIN t2 ON t1.a = t2.a AND t2.b * 50 > t1.b
+----
+Alice 100 NULL NULL
+Alice 100 NULL NULL
+Alice 50 Alice 2
+Alice 50 NULL NULL
+Bob 1 NULL NULL
+NULL NULL Alice 1
+NULL NULL Alice 1
+NULL NULL Alice 2
+
+query TITI rowsort
+SELECT * FROM t1 FULL JOIN t2 ON t1.a = t2.a AND t1.b > t2.b + 50
+----
+Alice 100 Alice 1
+Alice 100 Alice 2
+Alice 50 NULL NULL
+Alice 50 NULL NULL
+Bob 1 NULL NULL
+NULL NULL Alice 1
+NULL NULL Alice 2
+
+statement ok
+DROP TABLE t1;
+
+statement ok
+DROP TABLE t2;
+
+statement ok
+CREATE TABLE IF NOT EXISTS t1(t1_id INT, t1_name TEXT, t1_int INT) AS VALUES
+(11, 'a', 1),
+(22, 'b', 2),
+(33, 'c', 3),
+(44, 'd', 4);
+
+statement ok
+CREATE TABLE IF NOT EXISTS t2(t2_id INT, t2_name TEXT, t2_int INT) AS VALUES
+(11, 'z', 3),
+(22, 'y', 1),
+(44, 'x', 3),
+(55, 'w', 3);
+
+# inner join with join filter
+query III rowsort
+SELECT t1_id, t1_int, t2_int FROM t1 JOIN t2 ON t1_id = t2_id AND t1_int >= t2_int
+----
+22 2 1
+44 4 3
+
+# equijoin_multiple_condition_ordering
+query ITT rowsort
+SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t1_id = t2_id AND t1_name <> t2_name
+----
+11 a z
+22 b y
+44 d x
+
+# equijoin_right_and_condition_from_left
+query ITT rowsort
+SELECT t1_id, t1_name, t2_name FROM t1 RIGHT JOIN t2 ON t1_id = t2_id AND t1_id >= 22
+----
+22 b y
+44 d x
+NULL NULL w
+NULL NULL z
+
+# equijoin_left_and_condition_from_left
+query ITT rowsort
+SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id AND t1_id >= 44
+----
+11 a NULL
+22 b NULL
+33 c NULL
+44 d x
+
+# equijoin_left_and_condition_from_both
+query III rowsort
+SELECT t1_id, t1_int, t2_int FROM t1 LEFT JOIN t2 ON t1_id = t2_id AND t1_int >= t2_int
+----
+11 1 NULL
+22 2 1
+33 3 NULL
+44 4 3
+
+# equijoin_right_and_condition_from_right
+query ITT rowsort
+SELECT t1_id, t1_name, t2_name FROM t1 RIGHT JOIN t2 ON t1_id = t2_id AND t2_id >= 22
+----
+22 b y
+44 d x
+NULL NULL w
+NULL NULL z
+
+# equijoin_right_and_condition_from_both
+query III rowsort
+SELECT t1_int, t2_int, t2_id FROM t1 RIGHT JOIN t2 ON t1_id = t2_id AND t2_int <= t1_int
+----
+2 1 22
+4 3 44
+NULL 3 11
+NULL 3 55
+
+# equijoin_full
+query ITIITI rowsort
+SELECT * FROM t1 FULL JOIN t2 ON t1_id = t2_id
+----
+11 a 1 11 z 3
+22 b 2 22 y 1
+33 c 3 NULL NULL NULL
+44 d 4 44 x 3
+NULL NULL NULL 55 w 3
+
+# equijoin_full_and_condition_from_both
+query ITIITI rowsort
+SELECT * FROM t1 FULL JOIN t2 ON t1_id = t2_id AND t2_int <= t1_int
+----
+11 a 1 NULL NULL NULL
+22 b 2 22 y 1
+33 c 3 NULL NULL NULL
+44 d 4 44 x 3
+NULL NULL NULL 11 z 3
+NULL NULL NULL 55 w 3
+
+statement ok
+DROP TABLE t1;
+
+statement ok
+DROP TABLE t2;
+
+statement ok
+set datafusion.optimizer.prefer_hash_join = true;