You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/05/28 20:51:48 UTC
[arrow-ballista] branch master updated: Filter field for `JoinNode` and `HashJoinExecNode` (#36)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new 43a28c93 Filter field for `JoinNode` and `HashJoinExecNode` (#36)
43a28c93 is described below
commit 43a28c93e4091514a5107faafd0a2c54f280d347
Author: Eduard Karacharov <13...@users.noreply.github.com>
AuthorDate: Sat May 28 23:51:44 2022 +0300
Filter field for `JoinNode` and `HashJoinExecNode` (#36)
* bump datafuion rev (#35)
* support join filter in ballista serde
* dafafusion rev
* another datafusion update
* bump datafusion rev
Co-authored-by: Andy Grove <an...@gmail.com>
---
ballista-cli/Cargo.toml | 4 +-
ballista/rust/client/Cargo.toml | 2 +-
ballista/rust/core/Cargo.toml | 4 +-
ballista/rust/core/proto/ballista.proto | 18 +++++++
ballista/rust/core/src/serde/logical_plan/mod.rs | 25 +++++++--
ballista/rust/core/src/serde/mod.rs | 19 +++++++
ballista/rust/core/src/serde/physical_plan/mod.rs | 65 +++++++++++++++++++++++
ballista/rust/executor/Cargo.toml | 2 +-
ballista/rust/scheduler/Cargo.toml | 2 +-
benchmarks/Cargo.toml | 2 +-
examples/Cargo.toml | 2 +-
11 files changed, 133 insertions(+), 12 deletions(-)
diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml
index 30acbb6d..bed5dd74 100644
--- a/ballista-cli/Cargo.toml
+++ b/ballista-cli/Cargo.toml
@@ -31,8 +31,8 @@ readme = "README.md"
[dependencies]
ballista = { path = "../ballista/rust/client", version = "0.7.0" }
clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "9ea7dc6036a7b1d28c7450db4f26720b732a50de" }
-datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev = "9ea7dc6036a7b1d28c7450db4f26720b732a50de" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
+datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev = "b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
dirs = "4.0.0"
env_logger = "0.9"
mimalloc = { version = "0.1", default-features = false }
diff --git a/ballista/rust/client/Cargo.toml b/ballista/rust/client/Cargo.toml
index 41c6d622..0031be89 100644
--- a/ballista/rust/client/Cargo.toml
+++ b/ballista/rust/client/Cargo.toml
@@ -32,7 +32,7 @@ ballista-core = { path = "../core", version = "0.7.0" }
ballista-executor = { path = "../executor", version = "0.7.0", optional = true }
ballista-scheduler = { path = "../scheduler", version = "0.7.0", optional = true }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "9ea7dc6036a7b1d28c7450db4f26720b732a50de" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
futures = "0.3"
log = "0.4"
parking_lot = "0.12"
diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml
index e3ae3e27..07b5c219 100644
--- a/ballista/rust/core/Cargo.toml
+++ b/ballista/rust/core/Cargo.toml
@@ -39,8 +39,8 @@ arrow-flight = { version = "14.0.0" }
async-trait = "0.1.41"
chrono = { version = "0.4", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "9ea7dc6036a7b1d28c7450db4f26720b732a50de" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "9ea7dc6036a7b1d28c7450db4f26720b732a50de" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
futures = "0.3"
hashbrown = "0.12"
diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto
index 1ceb412f..7a53f510 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -236,6 +236,7 @@ message JoinNode {
repeated datafusion.Column left_join_column = 5;
repeated datafusion.Column right_join_column = 6;
bool null_equals_null = 7;
+ datafusion.LogicalExprNode filter = 8;
}
message UnionNode {
@@ -481,6 +482,7 @@ message HashJoinExecNode {
JoinType join_type = 4;
PartitionMode partition_mode = 6;
bool null_equals_null = 7;
+ JoinFilter filter = 8;
}
message UnionExecNode {
@@ -600,6 +602,22 @@ message RepartitionExecNode{
}
}
+message JoinFilter{
+ PhysicalExprNode expression = 1;
+ repeated ColumnIndex column_indices = 2;
+ datafusion.Schema schema = 3;
+}
+
+message ColumnIndex{
+ uint32 index = 1;
+ JoinSide side = 2;
+}
+
+enum JoinSide{
+ LEFT_SIDE = 0;
+ RIGHT_SIDE = 1;
+}
+
///////////////////////////////////////////////////////////////////////////////////////////////////
// Ballista Scheduling
///////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs
index f088f2f1..39034d94 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -434,6 +434,11 @@ impl AsLogicalPlan for LogicalPlanNode {
join.join_constraint
))
})?;
+ let filter: Option<Expr> = join
+ .filter
+ .as_ref()
+ .map(|expr| parse_expr(expr, ctx))
+ .map_or(Ok(None), |v| v.map(Some))?;
let builder = LogicalPlanBuilder::from(into_logical_plan!(
join.left,
@@ -445,6 +450,7 @@ impl AsLogicalPlan for LogicalPlanNode {
&into_logical_plan!(join.right, ctx, extension_codec)?,
join_type.into(),
(left_keys, right_keys),
+ filter,
)?,
JoinConstraint::Using => builder.join_using(
&into_logical_plan!(join.right, ctx, extension_codec)?,
@@ -695,6 +701,7 @@ impl AsLogicalPlan for LogicalPlanNode {
left,
right,
on,
+ filter,
join_type,
join_constraint,
null_equals_null,
@@ -715,6 +722,11 @@ impl AsLogicalPlan for LogicalPlanNode {
let join_type: protobuf::JoinType = join_type.to_owned().into();
let join_constraint: protobuf::JoinConstraint =
join_constraint.to_owned().into();
+ let filter = filter
+ .as_ref()
+ .map(|e| e.try_into())
+ .map_or(Ok(None), |v| v.map(Some))?;
+
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Join(Box::new(
protobuf::JoinNode {
@@ -725,6 +737,7 @@ impl AsLogicalPlan for LogicalPlanNode {
left_join_column,
right_join_column,
null_equals_null: *null_equals_null,
+ filter,
},
))),
})
@@ -1053,8 +1066,8 @@ mod roundtrip_tests {
},
datasource::listing::ListingTable,
logical_plan::{
- col, CreateExternalTable, Expr, FileType, LogicalPlan, LogicalPlanBuilder,
- Repartition, ToDFSchema,
+ binary_expr, col, CreateExternalTable, Expr, FileType, LogicalPlan,
+ LogicalPlanBuilder, Operator, Repartition, ToDFSchema,
},
prelude::*,
};
@@ -1280,10 +1293,16 @@ mod roundtrip_tests {
let scan_plan = test_scan_csv("employee1", Some(vec![0, 3, 4]))
.await?
.build()?;
+ let filter = binary_expr(col("employee1.x"), Operator::Gt, col("employee2.y"));
let plan = test_scan_csv("employee2", Some(vec![0, 3, 4]))
.await?
- .join(&scan_plan, JoinType::Inner, (vec!["id"], vec!["id"]))?
+ .join(
+ &scan_plan,
+ JoinType::Inner,
+ (vec!["id"], vec!["id"]),
+ Some(filter),
+ )?
.build()?;
roundtrip_test!(plan);
diff --git a/ballista/rust/core/src/serde/mod.rs b/ballista/rust/core/src/serde/mod.rs
index 5a5ebf87..b34b5cb4 100644
--- a/ballista/rust/core/src/serde/mod.rs
+++ b/ballista/rust/core/src/serde/mod.rs
@@ -32,6 +32,7 @@ use crate::{error::BallistaError, serde::scheduler::Action as BallistaAction};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::logical_plan::plan::Extension;
+use datafusion::physical_plan::join_utils::JoinSide;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
use prost::Message;
@@ -315,6 +316,24 @@ impl From<JoinType> for protobuf::JoinType {
}
}
+impl From<protobuf::JoinSide> for JoinSide {
+ fn from(t: protobuf::JoinSide) -> Self {
+ match t {
+ protobuf::JoinSide::LeftSide => JoinSide::Left,
+ protobuf::JoinSide::RightSide => JoinSide::Right,
+ }
+ }
+}
+
+impl From<JoinSide> for protobuf::JoinSide {
+ fn from(t: JoinSide) -> Self {
+ match t {
+ JoinSide::Left => protobuf::JoinSide::LeftSide,
+ JoinSide::Right => protobuf::JoinSide::RightSide,
+ }
+ }
+}
+
impl From<protobuf::JoinConstraint> for JoinConstraint {
fn from(t: protobuf::JoinConstraint) -> Self {
match t {
diff --git a/ballista/rust/core/src/serde/physical_plan/mod.rs b/ballista/rust/core/src/serde/physical_plan/mod.rs
index f5b495b6..33651a06 100644
--- a/ballista/rust/core/src/serde/physical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/physical_plan/mod.rs
@@ -41,6 +41,7 @@ use datafusion::physical_plan::file_format::{
};
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::hash_join::{HashJoinExec, PartitionMode};
+use datafusion::physical_plan::join_utils::{ColumnIndex, JoinFilter};
use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::repartition::RepartitionExec;
@@ -430,6 +431,40 @@ impl AsExecutionPlan for PhysicalPlanNode {
hashjoin.join_type
))
})?;
+ let filter = hashjoin
+ .filter
+ .as_ref()
+ .map(|f| {
+ let expression = parse_physical_expr(
+ f.expression.as_ref().ok_or_else(|| {
+ proto_error("Unexpected empty filter expression")
+ })?,
+ registry,
+ )?;
+ let column_indices = f.column_indices
+ .iter()
+ .map(|i| {
+ let side = protobuf::JoinSide::from_i32(i.side)
+ .ok_or_else(|| proto_error(format!(
+ "Received a HashJoinNode message with JoinSide in Filter {}",
+ i.side))
+ )?;
+
+ Ok(ColumnIndex{
+ index: i.index as usize,
+ side: side.into(),
+ })
+ })
+ .collect::<Result<Vec<_>, BallistaError>>()?;
+ let schema = f
+ .schema
+ .as_ref()
+ .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
+ .try_into()?;
+
+ Ok(JoinFilter::new(expression, column_indices, schema))
+ })
+ .map_or(Ok(None), |v: Result<JoinFilter, BallistaError>| v.map(Some))?;
let partition_mode =
protobuf::PartitionMode::from_i32(hashjoin.partition_mode)
@@ -447,6 +482,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
left,
right,
on,
+ filter,
&join_type.into(),
partition_mode,
&hashjoin.null_equals_null,
@@ -697,6 +733,33 @@ impl AsExecutionPlan for PhysicalPlanNode {
})
.collect();
let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
+ let filter = exec
+ .filter()
+ .as_ref()
+ .map(|f| {
+ let expression = f.expression().to_owned().try_into()?;
+ let column_indices = f
+ .column_indices()
+ .iter()
+ .map(|i| {
+ let side: protobuf::JoinSide = i.side.to_owned().into();
+ protobuf::ColumnIndex {
+ index: i.index as u32,
+ side: side.into(),
+ }
+ })
+ .collect();
+ let schema = f.schema().into();
+ Ok(protobuf::JoinFilter {
+ expression: Some(expression),
+ column_indices,
+ schema: Some(schema),
+ })
+ })
+ .map_or(
+ Ok(None),
+ |v: Result<protobuf::JoinFilter, BallistaError>| v.map(Some),
+ )?;
let partition_mode = match exec.partition_mode() {
PartitionMode::CollectLeft => protobuf::PartitionMode::CollectLeft,
@@ -712,6 +775,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
join_type: join_type.into(),
partition_mode: partition_mode.into(),
null_equals_null: *exec.null_equals_null(),
+ filter,
},
))),
})
@@ -1204,6 +1268,7 @@ mod roundtrip_tests {
Arc::new(EmptyExec::new(false, schema_left.clone())),
Arc::new(EmptyExec::new(false, schema_right.clone())),
on.clone(),
+ None,
join_type,
*partition_mode,
&false,
diff --git a/ballista/rust/executor/Cargo.toml b/ballista/rust/executor/Cargo.toml
index 9d182263..206592a4 100644
--- a/ballista/rust/executor/Cargo.toml
+++ b/ballista/rust/executor/Cargo.toml
@@ -40,7 +40,7 @@ async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.7.0" }
chrono = { version = "0.4", default-features = false }
configure_me = "0.4.0"
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "9ea7dc6036a7b1d28c7450db4f26720b732a50de" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
env_logger = "0.9"
futures = "0.3"
hyper = "0.14.4"
diff --git a/ballista/rust/scheduler/Cargo.toml b/ballista/rust/scheduler/Cargo.toml
index 861489b9..ee01cceb 100644
--- a/ballista/rust/scheduler/Cargo.toml
+++ b/ballista/rust/scheduler/Cargo.toml
@@ -41,7 +41,7 @@ async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.7.0" }
clap = { version = "3", features = ["derive", "cargo"] }
configure_me = "0.4.0"
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "9ea7dc6036a7b1d28c7450db4f26720b732a50de" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
env_logger = "0.9"
etcd-client = { version = "0.9", optional = true }
futures = "0.3"
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 2dd12088..5e59f362 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -33,7 +33,7 @@ snmalloc = ["snmalloc-rs"]
[dependencies]
ballista = { path = "../ballista/rust/client" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "9ea7dc6036a7b1d28c7450db4f26720b732a50de" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
env_logger = "0.9"
futures = "0.3"
mimalloc = { version = "0.1", optional = true, default-features = false }
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index bc815020..f925cf10 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -35,7 +35,7 @@ required-features = ["ballista/standalone"]
[dependencies]
ballista = { path = "../ballista/rust/client", version = "0.7.0" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "9ea7dc6036a7b1d28c7450db4f26720b732a50de" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
futures = "0.3"
num_cpus = "1.13.0"
prost = "0.10"