You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/05/28 20:51:48 UTC

[arrow-ballista] branch master updated: Filter field for `JoinNode` and `HashJoinExecNode` (#36)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new 43a28c93 Filter field for `JoinNode` and  `HashJoinExecNode` (#36)
43a28c93 is described below

commit 43a28c93e4091514a5107faafd0a2c54f280d347
Author: Eduard Karacharov <13...@users.noreply.github.com>
AuthorDate: Sat May 28 23:51:44 2022 +0300

    Filter field for `JoinNode` and  `HashJoinExecNode` (#36)
    
    * bump datafuion rev (#35)
    
    * support join filter in ballista serde
    
    * dafafusion rev
    
    * another datafusion update
    
    * bump datafusion rev
    
    Co-authored-by: Andy Grove <an...@gmail.com>
---
 ballista-cli/Cargo.toml                           |  4 +-
 ballista/rust/client/Cargo.toml                   |  2 +-
 ballista/rust/core/Cargo.toml                     |  4 +-
 ballista/rust/core/proto/ballista.proto           | 18 +++++++
 ballista/rust/core/src/serde/logical_plan/mod.rs  | 25 +++++++--
 ballista/rust/core/src/serde/mod.rs               | 19 +++++++
 ballista/rust/core/src/serde/physical_plan/mod.rs | 65 +++++++++++++++++++++++
 ballista/rust/executor/Cargo.toml                 |  2 +-
 ballista/rust/scheduler/Cargo.toml                |  2 +-
 benchmarks/Cargo.toml                             |  2 +-
 examples/Cargo.toml                               |  2 +-
 11 files changed, 133 insertions(+), 12 deletions(-)

diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml
index 30acbb6d..bed5dd74 100644
--- a/ballista-cli/Cargo.toml
+++ b/ballista-cli/Cargo.toml
@@ -31,8 +31,8 @@ readme = "README.md"
 [dependencies]
 ballista = { path = "../ballista/rust/client", version = "0.7.0" }
 clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "9ea7dc6036a7b1d28c7450db4f26720b732a50de" }
-datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev = "9ea7dc6036a7b1d28c7450db4f26720b732a50de" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
+datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev = "b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
 dirs = "4.0.0"
 env_logger = "0.9"
 mimalloc = { version = "0.1", default-features = false }
diff --git a/ballista/rust/client/Cargo.toml b/ballista/rust/client/Cargo.toml
index 41c6d622..0031be89 100644
--- a/ballista/rust/client/Cargo.toml
+++ b/ballista/rust/client/Cargo.toml
@@ -32,7 +32,7 @@ ballista-core = { path = "../core", version = "0.7.0" }
 ballista-executor = { path = "../executor", version = "0.7.0", optional = true }
 ballista-scheduler = { path = "../scheduler", version = "0.7.0", optional = true }
 
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "9ea7dc6036a7b1d28c7450db4f26720b732a50de" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
 futures = "0.3"
 log = "0.4"
 parking_lot = "0.12"
diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml
index e3ae3e27..07b5c219 100644
--- a/ballista/rust/core/Cargo.toml
+++ b/ballista/rust/core/Cargo.toml
@@ -39,8 +39,8 @@ arrow-flight = { version = "14.0.0" }
 async-trait = "0.1.41"
 chrono = { version = "0.4", default-features = false }
 clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "9ea7dc6036a7b1d28c7450db4f26720b732a50de" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "9ea7dc6036a7b1d28c7450db4f26720b732a50de" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
 futures = "0.3"
 hashbrown = "0.12"
 
diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto
index 1ceb412f..7a53f510 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -236,6 +236,7 @@ message JoinNode {
   repeated datafusion.Column left_join_column = 5;
   repeated datafusion.Column right_join_column = 6;
   bool null_equals_null = 7;
+  datafusion.LogicalExprNode filter = 8;
 }
 
 message UnionNode {
@@ -481,6 +482,7 @@ message HashJoinExecNode {
   JoinType join_type = 4;
   PartitionMode partition_mode = 6;
   bool null_equals_null = 7;
+  JoinFilter filter = 8;
 }
 
 message UnionExecNode {
@@ -600,6 +602,22 @@ message RepartitionExecNode{
   }
 }
 
+message JoinFilter{
+  PhysicalExprNode expression = 1;
+  repeated ColumnIndex column_indices = 2;
+  datafusion.Schema schema = 3;
+}
+
+message ColumnIndex{
+  uint32 index = 1;
+  JoinSide side = 2;
+}
+
+enum JoinSide{
+  LEFT_SIDE = 0;
+  RIGHT_SIDE = 1;
+}
+
 ///////////////////////////////////////////////////////////////////////////////////////////////////
 // Ballista Scheduling
 ///////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs
index f088f2f1..39034d94 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -434,6 +434,11 @@ impl AsLogicalPlan for LogicalPlanNode {
                         join.join_constraint
                     ))
                 })?;
+                let filter: Option<Expr> = join
+                    .filter
+                    .as_ref()
+                    .map(|expr| parse_expr(expr, ctx))
+                    .map_or(Ok(None), |v| v.map(Some))?;
 
                 let builder = LogicalPlanBuilder::from(into_logical_plan!(
                     join.left,
@@ -445,6 +450,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                         &into_logical_plan!(join.right, ctx, extension_codec)?,
                         join_type.into(),
                         (left_keys, right_keys),
+                        filter,
                     )?,
                     JoinConstraint::Using => builder.join_using(
                         &into_logical_plan!(join.right, ctx, extension_codec)?,
@@ -695,6 +701,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                 left,
                 right,
                 on,
+                filter,
                 join_type,
                 join_constraint,
                 null_equals_null,
@@ -715,6 +722,11 @@ impl AsLogicalPlan for LogicalPlanNode {
                 let join_type: protobuf::JoinType = join_type.to_owned().into();
                 let join_constraint: protobuf::JoinConstraint =
                     join_constraint.to_owned().into();
+                let filter = filter
+                    .as_ref()
+                    .map(|e| e.try_into())
+                    .map_or(Ok(None), |v| v.map(Some))?;
+
                 Ok(protobuf::LogicalPlanNode {
                     logical_plan_type: Some(LogicalPlanType::Join(Box::new(
                         protobuf::JoinNode {
@@ -725,6 +737,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                             left_join_column,
                             right_join_column,
                             null_equals_null: *null_equals_null,
+                            filter,
                         },
                     ))),
                 })
@@ -1053,8 +1066,8 @@ mod roundtrip_tests {
         },
         datasource::listing::ListingTable,
         logical_plan::{
-            col, CreateExternalTable, Expr, FileType, LogicalPlan, LogicalPlanBuilder,
-            Repartition, ToDFSchema,
+            binary_expr, col, CreateExternalTable, Expr, FileType, LogicalPlan,
+            LogicalPlanBuilder, Operator, Repartition, ToDFSchema,
         },
         prelude::*,
     };
@@ -1280,10 +1293,16 @@ mod roundtrip_tests {
         let scan_plan = test_scan_csv("employee1", Some(vec![0, 3, 4]))
             .await?
             .build()?;
+        let filter = binary_expr(col("employee1.x"), Operator::Gt, col("employee2.y"));
 
         let plan = test_scan_csv("employee2", Some(vec![0, 3, 4]))
             .await?
-            .join(&scan_plan, JoinType::Inner, (vec!["id"], vec!["id"]))?
+            .join(
+                &scan_plan,
+                JoinType::Inner,
+                (vec!["id"], vec!["id"]),
+                Some(filter),
+            )?
             .build()?;
 
         roundtrip_test!(plan);
diff --git a/ballista/rust/core/src/serde/mod.rs b/ballista/rust/core/src/serde/mod.rs
index 5a5ebf87..b34b5cb4 100644
--- a/ballista/rust/core/src/serde/mod.rs
+++ b/ballista/rust/core/src/serde/mod.rs
@@ -32,6 +32,7 @@ use crate::{error::BallistaError, serde::scheduler::Action as BallistaAction};
 
 use datafusion::execution::runtime_env::RuntimeEnv;
 use datafusion::logical_plan::plan::Extension;
+use datafusion::physical_plan::join_utils::JoinSide;
 use datafusion::physical_plan::ExecutionPlan;
 use datafusion::prelude::SessionContext;
 use prost::Message;
@@ -315,6 +316,24 @@ impl From<JoinType> for protobuf::JoinType {
     }
 }
 
+impl From<protobuf::JoinSide> for JoinSide {
+    fn from(t: protobuf::JoinSide) -> Self {
+        match t {
+            protobuf::JoinSide::LeftSide => JoinSide::Left,
+            protobuf::JoinSide::RightSide => JoinSide::Right,
+        }
+    }
+}
+
+impl From<JoinSide> for protobuf::JoinSide {
+    fn from(t: JoinSide) -> Self {
+        match t {
+            JoinSide::Left => protobuf::JoinSide::LeftSide,
+            JoinSide::Right => protobuf::JoinSide::RightSide,
+        }
+    }
+}
+
 impl From<protobuf::JoinConstraint> for JoinConstraint {
     fn from(t: protobuf::JoinConstraint) -> Self {
         match t {
diff --git a/ballista/rust/core/src/serde/physical_plan/mod.rs b/ballista/rust/core/src/serde/physical_plan/mod.rs
index f5b495b6..33651a06 100644
--- a/ballista/rust/core/src/serde/physical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/physical_plan/mod.rs
@@ -41,6 +41,7 @@ use datafusion::physical_plan::file_format::{
 };
 use datafusion::physical_plan::filter::FilterExec;
 use datafusion::physical_plan::hash_join::{HashJoinExec, PartitionMode};
+use datafusion::physical_plan::join_utils::{ColumnIndex, JoinFilter};
 use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
 use datafusion::physical_plan::projection::ProjectionExec;
 use datafusion::physical_plan::repartition::RepartitionExec;
@@ -430,6 +431,40 @@ impl AsExecutionPlan for PhysicalPlanNode {
                             hashjoin.join_type
                         ))
                     })?;
+                let filter = hashjoin
+                    .filter
+                    .as_ref()
+                    .map(|f| {
+                        let expression = parse_physical_expr(
+                            f.expression.as_ref().ok_or_else(|| {
+                                proto_error("Unexpected empty filter expression")
+                            })?,
+                            registry,
+                        )?;
+                        let column_indices = f.column_indices
+                            .iter()
+                            .map(|i| {
+                                let side = protobuf::JoinSide::from_i32(i.side)
+                                    .ok_or_else(|| proto_error(format!(
+                                        "Received a HashJoinNode message with JoinSide in Filter {}",
+                                        i.side))
+                                    )?;
+
+                                Ok(ColumnIndex{
+                                    index: i.index as usize,
+                                    side: side.into(),
+                                })
+                            })
+                            .collect::<Result<Vec<_>, BallistaError>>()?;
+                        let schema = f
+                            .schema
+                            .as_ref()
+                            .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
+                            .try_into()?;
+
+                        Ok(JoinFilter::new(expression, column_indices, schema))
+                    })
+                    .map_or(Ok(None), |v: Result<JoinFilter, BallistaError>| v.map(Some))?;
 
                 let partition_mode =
                     protobuf::PartitionMode::from_i32(hashjoin.partition_mode)
@@ -447,6 +482,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
                     left,
                     right,
                     on,
+                    filter,
                     &join_type.into(),
                     partition_mode,
                     &hashjoin.null_equals_null,
@@ -697,6 +733,33 @@ impl AsExecutionPlan for PhysicalPlanNode {
                 })
                 .collect();
             let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
+            let filter = exec
+                .filter()
+                .as_ref()
+                .map(|f| {
+                    let expression = f.expression().to_owned().try_into()?;
+                    let column_indices = f
+                        .column_indices()
+                        .iter()
+                        .map(|i| {
+                            let side: protobuf::JoinSide = i.side.to_owned().into();
+                            protobuf::ColumnIndex {
+                                index: i.index as u32,
+                                side: side.into(),
+                            }
+                        })
+                        .collect();
+                    let schema = f.schema().into();
+                    Ok(protobuf::JoinFilter {
+                        expression: Some(expression),
+                        column_indices,
+                        schema: Some(schema),
+                    })
+                })
+                .map_or(
+                    Ok(None),
+                    |v: Result<protobuf::JoinFilter, BallistaError>| v.map(Some),
+                )?;
 
             let partition_mode = match exec.partition_mode() {
                 PartitionMode::CollectLeft => protobuf::PartitionMode::CollectLeft,
@@ -712,6 +775,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
                         join_type: join_type.into(),
                         partition_mode: partition_mode.into(),
                         null_equals_null: *exec.null_equals_null(),
+                        filter,
                     },
                 ))),
             })
@@ -1204,6 +1268,7 @@ mod roundtrip_tests {
                     Arc::new(EmptyExec::new(false, schema_left.clone())),
                     Arc::new(EmptyExec::new(false, schema_right.clone())),
                     on.clone(),
+                    None,
                     join_type,
                     *partition_mode,
                     &false,
diff --git a/ballista/rust/executor/Cargo.toml b/ballista/rust/executor/Cargo.toml
index 9d182263..206592a4 100644
--- a/ballista/rust/executor/Cargo.toml
+++ b/ballista/rust/executor/Cargo.toml
@@ -40,7 +40,7 @@ async-trait = "0.1.41"
 ballista-core = { path = "../core", version = "0.7.0" }
 chrono = { version = "0.4", default-features = false }
 configure_me = "0.4.0"
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "9ea7dc6036a7b1d28c7450db4f26720b732a50de" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
 env_logger = "0.9"
 futures = "0.3"
 hyper = "0.14.4"
diff --git a/ballista/rust/scheduler/Cargo.toml b/ballista/rust/scheduler/Cargo.toml
index 861489b9..ee01cceb 100644
--- a/ballista/rust/scheduler/Cargo.toml
+++ b/ballista/rust/scheduler/Cargo.toml
@@ -41,7 +41,7 @@ async-trait = "0.1.41"
 ballista-core = { path = "../core", version = "0.7.0" }
 clap = { version = "3", features = ["derive", "cargo"] }
 configure_me = "0.4.0"
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "9ea7dc6036a7b1d28c7450db4f26720b732a50de" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
 env_logger = "0.9"
 etcd-client = { version = "0.9", optional = true }
 futures = "0.3"
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 2dd12088..5e59f362 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -33,7 +33,7 @@ snmalloc = ["snmalloc-rs"]
 
 [dependencies]
 ballista = { path = "../ballista/rust/client" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "9ea7dc6036a7b1d28c7450db4f26720b732a50de" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
 env_logger = "0.9"
 futures = "0.3"
 mimalloc = { version = "0.1", optional = true, default-features = false }
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index bc815020..f925cf10 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -35,7 +35,7 @@ required-features = ["ballista/standalone"]
 
 [dependencies]
 ballista = { path = "../ballista/rust/client", version = "0.7.0" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "9ea7dc6036a7b1d28c7450db4f26720b732a50de" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
 futures = "0.3"
 num_cpus = "1.13.0"
 prost = "0.10"