You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/06/07 13:20:07 UTC
[arrow-datafusion] branch main updated: Move JoinType to datafusion_common (#6572)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new cbfb7c8d58 Move JoinType to datafusion_common (#6572)
cbfb7c8d58 is described below
commit cbfb7c8d5856aa252b6a704a8c3e438ba720999b
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Wed Jun 7 09:20:00 2023 -0400
Move JoinType to datafusion_common (#6572)
---
datafusion/common/src/join_type.rs | 98 ++++++++++++++++++++++
datafusion/common/src/lib.rs | 2 +
.../core/src/physical_plan/joins/hash_join.rs | 2 +-
.../src/physical_plan/joins/sort_merge_join.rs | 4 +-
.../src/physical_plan/joins/symmetric_hash_join.rs | 2 +-
datafusion/core/src/physical_plan/joins/utils.rs | 2 +-
datafusion/expr/src/logical_plan/plan.rs | 77 +----------------
7 files changed, 108 insertions(+), 79 deletions(-)
diff --git a/datafusion/common/src/join_type.rs b/datafusion/common/src/join_type.rs
new file mode 100644
index 0000000000..9da9e5625f
--- /dev/null
+++ b/datafusion/common/src/join_type.rs
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`JoinType`] and [`JoinConstraint`]
+
+use std::{
+ fmt::{self, Display, Formatter},
+ str::FromStr,
+};
+
+use crate::{DataFusionError, Result};
+
+/// Join type
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+pub enum JoinType {
+ /// Inner Join
+ Inner,
+ /// Left Join
+ Left,
+ /// Right Join
+ Right,
+ /// Full Join
+ Full,
+ /// Left Semi Join
+ LeftSemi,
+ /// Right Semi Join
+ RightSemi,
+ /// Left Anti Join
+ LeftAnti,
+ /// Right Anti Join
+ RightAnti,
+}
+
+impl JoinType {
+ pub fn is_outer(self) -> bool {
+ self == JoinType::Left || self == JoinType::Right || self == JoinType::Full
+ }
+}
+
+impl Display for JoinType {
+ fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+ let join_type = match self {
+ JoinType::Inner => "Inner",
+ JoinType::Left => "Left",
+ JoinType::Right => "Right",
+ JoinType::Full => "Full",
+ JoinType::LeftSemi => "LeftSemi",
+ JoinType::RightSemi => "RightSemi",
+ JoinType::LeftAnti => "LeftAnti",
+ JoinType::RightAnti => "RightAnti",
+ };
+ write!(f, "{join_type}")
+ }
+}
+
+impl FromStr for JoinType {
+ type Err = DataFusionError;
+
+ fn from_str(s: &str) -> Result<Self> {
+ let s = s.to_uppercase();
+ match s.as_str() {
+ "INNER" => Ok(JoinType::Inner),
+ "LEFT" => Ok(JoinType::Left),
+ "RIGHT" => Ok(JoinType::Right),
+ "FULL" => Ok(JoinType::Full),
+ "LEFTSEMI" => Ok(JoinType::LeftSemi),
+ "RIGHTSEMI" => Ok(JoinType::RightSemi),
+ "LEFTANTI" => Ok(JoinType::LeftAnti),
+ "RIGHTANTI" => Ok(JoinType::RightAnti),
+ _ => Err(DataFusionError::NotImplemented(format!(
+ "The join type {s} does not exist or is not implemented"
+ ))),
+ }
+ }
+}
+
+/// Join constraint
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+pub enum JoinConstraint {
+ /// Join ON
+ On,
+ /// Join USING
+ Using,
+}
diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs
index 7bbb3fbb71..f58964bc22 100644
--- a/datafusion/common/src/lib.rs
+++ b/datafusion/common/src/lib.rs
@@ -22,6 +22,7 @@ pub mod delta;
mod dfschema;
mod error;
pub mod from_slice;
+mod join_type;
pub mod parsers;
#[cfg(feature = "pyarrow")]
mod pyarrow;
@@ -39,6 +40,7 @@ pub use error::{
field_not_found, unqualified_field_not_found, DataFusionError, Result, SchemaError,
SharedResult,
};
+pub use join_type::{JoinConstraint, JoinType};
pub use scalar::{ScalarType, ScalarValue};
pub use schema_reference::{OwnedSchemaReference, SchemaReference};
pub use stats::{ColumnStatistics, Statistics};
diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs
index 2c8f8b55da..a6cb1c6f82 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -57,7 +57,6 @@ use crate::arrow::array::BooleanBufferBuilder;
use crate::arrow::datatypes::TimeUnit;
use crate::error::{DataFusionError, Result};
use crate::execution::{context::TaskContext, memory_pool::MemoryConsumer};
-use crate::logical_expr::JoinType;
use crate::physical_plan::joins::utils::{
adjust_indices_by_join_type, apply_join_filter_to_indices, build_batch_from_indices,
get_final_indices_from_bit_map, need_produce_result_in_final, JoinSide,
@@ -78,6 +77,7 @@ use crate::physical_plan::{
DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, Partitioning,
PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, Statistics,
};
+use datafusion_common::JoinType;
use super::{
utils::{OnceAsync, OnceFut},
diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
index 85bd18e592..67e111fd46 100644
--- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
+++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
@@ -40,7 +40,6 @@ use futures::{Stream, StreamExt};
use crate::error::DataFusionError;
use crate::error::Result;
-use crate::logical_expr::JoinType;
use crate::physical_plan::expressions::Column;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::joins::utils::{
@@ -52,6 +51,7 @@ use crate::physical_plan::{
metrics, DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan,
Partitioning, PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, Statistics,
};
+use datafusion_common::JoinType;
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::TaskContext;
@@ -1396,7 +1396,6 @@ mod tests {
use crate::common::assert_contains;
use crate::error::Result;
- use crate::logical_expr::JoinType;
use crate::physical_plan::expressions::Column;
use crate::physical_plan::joins::utils::JoinOn;
use crate::physical_plan::joins::SortMergeJoinExec;
@@ -1405,6 +1404,7 @@ mod tests {
use crate::prelude::{SessionConfig, SessionContext};
use crate::test::{build_table_i32, columns};
use crate::{assert_batches_eq, assert_batches_sorted_eq};
+ use datafusion_common::JoinType;
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
fn build_table(
diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
index b5c5b06c3f..eaaec759b9 100644
--- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
@@ -51,7 +51,6 @@ use datafusion_execution::memory_pool::MemoryConsumer;
use datafusion_physical_expr::intervals::{ExprIntervalGraph, Interval, IntervalBound};
use crate::error::{DataFusionError, Result};
-use crate::logical_expr::JoinType;
use crate::physical_plan::common::SharedMemoryReservation;
use crate::physical_plan::joins::hash_join_utils::convert_sort_expr_with_filter_schema;
use crate::physical_plan::joins::hash_join_utils::JoinHashMap;
@@ -71,6 +70,7 @@ use crate::physical_plan::{
DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, Partitioning,
RecordBatchStream, SendableRecordBatchStream, Statistics,
};
+use datafusion_common::JoinType;
use datafusion_execution::TaskContext;
const HASHMAP_SHRINK_SCALE_FACTOR: usize = 4;
diff --git a/datafusion/core/src/physical_plan/joins/utils.rs b/datafusion/core/src/physical_plan/joins/utils.rs
index 412e774806..88da4726ec 100644
--- a/datafusion/core/src/physical_plan/joins/utils.rs
+++ b/datafusion/core/src/physical_plan/joins/utils.rs
@@ -42,8 +42,8 @@ use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_physical_expr::{EquivalentClass, PhysicalExpr};
use crate::error::{DataFusionError, Result};
-use crate::logical_expr::JoinType;
use crate::physical_plan::expressions::Column;
+use datafusion_common::JoinType;
use crate::physical_plan::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder};
use crate::physical_plan::SchemaRef;
diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs
index 798a54273c..e19b327785 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -40,9 +40,11 @@ use datafusion_common::{
use std::collections::{HashMap, HashSet};
use std::fmt::{self, Debug, Display, Formatter};
use std::hash::{Hash, Hasher};
-use std::str::FromStr;
use std::sync::Arc;
+// backwards compatible
+pub use datafusion_common::{JoinConstraint, JoinType};
+
use super::DdlStatement;
/// A LogicalPlan represents the different types of relational
@@ -1128,79 +1130,6 @@ impl ToStringifiedPlan for LogicalPlan {
}
}
-/// Join type
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
-pub enum JoinType {
- /// Inner Join
- Inner,
- /// Left Join
- Left,
- /// Right Join
- Right,
- /// Full Join
- Full,
- /// Left Semi Join
- LeftSemi,
- /// Right Semi Join
- RightSemi,
- /// Left Anti Join
- LeftAnti,
- /// Right Anti Join
- RightAnti,
-}
-
-impl JoinType {
- pub fn is_outer(self) -> bool {
- self == JoinType::Left || self == JoinType::Right || self == JoinType::Full
- }
-}
-
-impl Display for JoinType {
- fn fmt(&self, f: &mut Formatter) -> fmt::Result {
- let join_type = match self {
- JoinType::Inner => "Inner",
- JoinType::Left => "Left",
- JoinType::Right => "Right",
- JoinType::Full => "Full",
- JoinType::LeftSemi => "LeftSemi",
- JoinType::RightSemi => "RightSemi",
- JoinType::LeftAnti => "LeftAnti",
- JoinType::RightAnti => "RightAnti",
- };
- write!(f, "{join_type}")
- }
-}
-
-impl FromStr for JoinType {
- type Err = DataFusionError;
-
- fn from_str(s: &str) -> Result<Self> {
- let s = s.to_uppercase();
- match s.as_str() {
- "INNER" => Ok(JoinType::Inner),
- "LEFT" => Ok(JoinType::Left),
- "RIGHT" => Ok(JoinType::Right),
- "FULL" => Ok(JoinType::Full),
- "LEFTSEMI" => Ok(JoinType::LeftSemi),
- "RIGHTSEMI" => Ok(JoinType::RightSemi),
- "LEFTANTI" => Ok(JoinType::LeftAnti),
- "RIGHTANTI" => Ok(JoinType::RightAnti),
- _ => Err(DataFusionError::NotImplemented(format!(
- "The join type {s} does not exist or is not implemented"
- ))),
- }
- }
-}
-
-/// Join constraint
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
-pub enum JoinConstraint {
- /// Join ON
- On,
- /// Join USING
- Using,
-}
-
/// Produces no rows: An empty relation with an empty schema
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct EmptyRelation {