You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2022/10/28 21:45:04 UTC

[arrow-ballista] branch master updated: Bump DataFusion (#471)

This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new 197ad018 Bump DataFusion (#471)
197ad018 is described below

commit 197ad01858007a5c16901f7e69d7158213c2bda2
Author: Daniƫl Heres <da...@gmail.com>
AuthorDate: Fri Oct 28 23:44:57 2022 +0200

    Bump DataFusion (#471)
    
    * Bump datafusion
    
    * Proto update
    
    * Update with right semi
    
    * Update datafusion.proto
    
    * Update rev
---
 ballista-cli/Cargo.toml                      |   4 +-
 ballista/client/Cargo.toml                   |   4 +-
 ballista/core/Cargo.toml                     |   4 +-
 ballista/core/proto/datafusion.proto         | 254 ++++++++++++---------------
 ballista/core/src/serde/physical_plan/mod.rs |   5 +-
 ballista/executor/Cargo.toml                 |   4 +-
 ballista/scheduler/Cargo.toml                |   4 +-
 benchmarks/Cargo.toml                        |   4 +-
 examples/Cargo.toml                          |   2 +-
 python/Cargo.toml                            |   2 +-
 python/src/dataframe.rs                      |   5 +-
 11 files changed, 131 insertions(+), 161 deletions(-)

diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml
index f0fbaad6..c7af9a41 100644
--- a/ballista-cli/Cargo.toml
+++ b/ballista-cli/Cargo.toml
@@ -33,8 +33,8 @@ ballista = { path = "../ballista/client", version = "0.9.0", features = [
     "standalone",
 ] }
 clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "fa5cd7f30d785c5b3355e425e082a9d5a91bf567" }
-datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev = "fa5cd7f30d785c5b3355e425e082a9d5a91bf567" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "925a96225e2142d8adc53a3323ddf612fffe2007" }
+datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev = "925a96225e2142d8adc53a3323ddf612fffe2007" }
 dirs = "4.0.0"
 env_logger = "0.9"
 mimalloc = { version = "0.1", default-features = false }
diff --git a/ballista/client/Cargo.toml b/ballista/client/Cargo.toml
index 33eb9a53..8eaaf301 100644
--- a/ballista/client/Cargo.toml
+++ b/ballista/client/Cargo.toml
@@ -31,8 +31,8 @@ rust-version = "1.59"
 ballista-core = { path = "../core", version = "0.9.0" }
 ballista-executor = { path = "../executor", version = "0.9.0", optional = true }
 ballista-scheduler = { path = "../scheduler", version = "0.9.0", optional = true }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "fa5cd7f30d785c5b3355e425e082a9d5a91bf567" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "fa5cd7f30d785c5b3355e425e082a9d5a91bf567" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "925a96225e2142d8adc53a3323ddf612fffe2007" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "925a96225e2142d8adc53a3323ddf612fffe2007" }
 futures = "0.3"
 log = "0.4"
 parking_lot = "0.12"
diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml
index 2d710da9..91be1f09 100644
--- a/ballista/core/Cargo.toml
+++ b/ballista/core/Cargo.toml
@@ -46,9 +46,9 @@ arrow-flight = { version = "25.0.0", features = ["flight-sql-experimental"] }
 async-trait = "0.1.41"
 chrono = { version = "0.4", default-features = false }
 clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "fa5cd7f30d785c5b3355e425e082a9d5a91bf567" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "925a96225e2142d8adc53a3323ddf612fffe2007" }
 datafusion-objectstore-hdfs = { version = "0.1.1", default-features = false, optional = true }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "fa5cd7f30d785c5b3355e425e082a9d5a91bf567" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "925a96225e2142d8adc53a3323ddf612fffe2007" }
 futures = "0.3"
 hashbrown = "0.12"
 
diff --git a/ballista/core/proto/datafusion.proto b/ballista/core/proto/datafusion.proto
index b7ae6360..3bbf6fa1 100644
--- a/ballista/core/proto/datafusion.proto
+++ b/ballista/core/proto/datafusion.proto
@@ -230,8 +230,9 @@ enum JoinType {
   LEFT = 1;
   RIGHT = 2;
   FULL = 3;
-  SEMI = 4;
-  ANTI = 5;
+  LEFTSEMI = 4;
+  LEFTANTI = 5;
+  RIGHTSEMI = 6;
 }
 
 enum JoinConstraint {
@@ -648,11 +649,7 @@ enum WindowFrameBoundType {
 
 message WindowFrameBound {
   WindowFrameBoundType window_frame_bound_type = 1;
-  // "optional" keyword is stable in protoc 3.15 but prost is still on 3.14 (see https://github.com/tokio-rs/prost/issues/430 and https://github.com/tokio-rs/prost/pull/455)
-  // this syntax is ugly but is binary compatible with the "optional" keyword (see https://stackoverflow.com/questions/42622015/how-to-define-an-optional-field-in-protobuf-3)
-  oneof bound_value {
-    uint64 value = 2;
-  }
+  ScalarValue bound_value = 2;
 }
 
 ///////////////////////////////////////////////////////////////////////////////////////////////////
@@ -673,53 +670,53 @@ message Field {
 }
 
 message FixedSizeBinary{
-  int32 length = 1;
+    int32 length = 1;
 }
 
 message Timestamp{
-  TimeUnit time_unit = 1;
-  string timezone = 2;
+    TimeUnit time_unit = 1;
+    string timezone = 2;
 }
 
 enum DateUnit{
-  Day = 0;
-  DateMillisecond = 1;
+    Day = 0;
+    DateMillisecond = 1;
 }
 
 enum TimeUnit{
-  Second = 0;
-  Millisecond = 1;
-  Microsecond = 2;
-  Nanosecond = 3;
+    Second = 0;
+    Millisecond = 1;
+    Microsecond = 2;
+    Nanosecond = 3;
 }
 
 enum IntervalUnit{
-  YearMonth = 0;
-  DayTime = 1;
-  MonthDayNano = 2;
+    YearMonth = 0;
+    DayTime = 1;
+    MonthDayNano = 2;
 }
 
 message Decimal{
-  uint64 whole = 1;
-  uint64 fractional = 2;
+    uint64 whole = 1;
+    uint64 fractional = 2;
 }
 
 message List{
-  Field field_type = 1;
+    Field field_type = 1;
 }
 
 message FixedSizeList{
-  Field field_type = 1;
-  int32 list_size = 2;
+    Field field_type = 1;
+    int32 list_size = 2;
 }
 
 message Dictionary{
-  ArrowType key = 1;
-  ArrowType value = 2;
+    ArrowType key = 1;
+    ArrowType value = 2;
 }
 
 message Struct{
-  repeated Field sub_field_types = 1;
+    repeated Field sub_field_types = 1;
 }
 
 enum UnionMode{
@@ -728,17 +725,17 @@ enum UnionMode{
 }
 
 message Union{
-  repeated Field union_types = 1;
-  UnionMode union_mode = 2;
-  repeated int32 type_ids = 3;
+    repeated Field union_types = 1;
+    UnionMode union_mode = 2;
+    repeated int32 type_ids = 3;
 }
 
 message ScalarListValue{
-  // encode null explicitly to distinguish a list with a null value
-  // from a list with no values)
-  bool is_null = 3;
-  Field field = 1;
-  repeated ScalarValue values = 2;
+    // encode null explicitly to distinguish a list with a null value
+    // from a list with no values)
+    bool is_null = 3;
+    Field field = 1;
+    repeated ScalarValue values = 2;
 }
 
 message ScalarTimestampValue {
@@ -770,41 +767,51 @@ message StructValue {
   repeated Field fields = 3;
 }
 
+message ScalarFixedSizeBinary{
+    bytes values =  1;
+    int32 length = 2;
+}
+
 message ScalarValue{
+  // was PrimitiveScalarType null_value = 19;
+  reserved 19;
+
   oneof value {
-    // Null value of any type (type is encoded)
-    PrimitiveScalarType null_value = 19;
-
-    bool   bool_value = 1;
-    string utf8_value = 2;
-    string large_utf8_value = 3;
-    int32  int8_value = 4;
-    int32  int16_value = 5;
-    int32  int32_value = 6;
-    int64  int64_value = 7;
-    uint32 uint8_value = 8;
-    uint32 uint16_value = 9;
-    uint32 uint32_value = 10;
-    uint64 uint64_value = 11;
-    float  float32_value = 12;
-    double float64_value = 13;
-    //Literal Date32 value always has a unit of day
-    int32  date_32_value = 14;
-    ScalarListValue list_value = 17;
-    //WAS: ScalarType null_list_value = 18;
-
-    Decimal128 decimal128_value = 20;
-    int64 date_64_value = 21;
-    int32 interval_yearmonth_value = 24;
-    int64 interval_daytime_value = 25;
-    ScalarTimestampValue timestamp_value = 26;
-    ScalarDictionaryValue dictionary_value = 27;
-    bytes binary_value = 28;
-    bytes large_binary_value = 29;
-    int64 time64_value = 30;
-    IntervalMonthDayNanoValue interval_month_day_nano = 31;
-    StructValue struct_value = 32;
-  }
+        // was PrimitiveScalarType null_value = 19;
+        // Null value of any type
+        ArrowType null_value = 33;
+
+        bool   bool_value = 1;
+        string utf8_value = 2;
+        string large_utf8_value = 3;
+        int32  int8_value = 4;
+        int32  int16_value = 5;
+        int32  int32_value = 6;
+        int64  int64_value = 7;
+        uint32 uint8_value = 8;
+        uint32 uint16_value = 9;
+        uint32 uint32_value = 10;
+        uint64 uint64_value = 11;
+        float  float32_value = 12;
+        double float64_value = 13;
+        // Literal Date32 value always has a unit of day
+        int32  date_32_value = 14;
+        ScalarListValue list_value = 17;
+        //WAS: ScalarType null_list_value = 18;
+
+        Decimal128 decimal128_value = 20;
+        int64 date_64_value = 21;
+        int32 interval_yearmonth_value = 24;
+        int64 interval_daytime_value = 25;
+        ScalarTimestampValue timestamp_value = 26;
+        ScalarDictionaryValue dictionary_value = 27;
+        bytes binary_value = 28;
+        bytes large_binary_value = 29;
+        int64 time64_value = 30;
+        IntervalMonthDayNanoValue interval_month_day_nano = 31;
+        StructValue struct_value = 32;
+        ScalarFixedSizeBinary fixed_size_binary_value = 34;
+    }
 }
 
 message Decimal128{
@@ -813,81 +820,42 @@ message Decimal128{
   int64 s = 3;
 }
 
-// Contains all valid datafusion scalar type except for
-// List
-enum PrimitiveScalarType{
-
-  BOOL = 0;     // arrow::Type::BOOL
-  UINT8 = 1;    // arrow::Type::UINT8
-  INT8 = 2;     // arrow::Type::INT8
-  UINT16 = 3;   // represents arrow::Type fields in src/arrow/type.h
-  INT16 = 4;
-  UINT32 = 5;
-  INT32 = 6;
-  UINT64 = 7;
-  INT64 = 8;
-  FLOAT32 = 9;
-  FLOAT64 = 10;
-  UTF8 = 11;
-  LARGE_UTF8 = 12;
-  DATE32 = 13;
-  TIMESTAMP_MICROSECOND = 14;
-  TIMESTAMP_NANOSECOND = 15;
-  NULL = 16;
-  DECIMAL128 = 17;
-  DATE64 = 20;
-  TIMESTAMP_SECOND = 21;
-  TIMESTAMP_MILLISECOND = 22;
-  INTERVAL_YEARMONTH = 23;
-  INTERVAL_DAYTIME = 24;
-  INTERVAL_MONTHDAYNANO = 28;
-
-  BINARY = 25;
-  LARGE_BINARY = 26;
-
-  TIME64 = 27;
-}
-
-
-// Broke out into multiple message types so that type
-// metadata did not need to be in separate message
-// All types that are of the empty message types contain no additional metadata
-// about the type
+// Serialized data type
 message ArrowType{
-  oneof arrow_type_enum{
-    EmptyMessage NONE = 1;     // arrow::Type::NA
-    EmptyMessage BOOL =  2;     // arrow::Type::BOOL
-    EmptyMessage UINT8 = 3;    // arrow::Type::UINT8
-    EmptyMessage INT8 =  4;     // arrow::Type::INT8
-    EmptyMessage UINT16 =5;   // represents arrow::Type fields in src/arrow/type.h
-    EmptyMessage INT16 = 6;
-    EmptyMessage UINT32 =7;
-    EmptyMessage INT32 = 8;
-    EmptyMessage UINT64 =9;
-    EmptyMessage INT64 =10 ;
-    EmptyMessage FLOAT16 =11 ;
-    EmptyMessage FLOAT32 =12 ;
-    EmptyMessage FLOAT64 =13 ;
-    EmptyMessage UTF8 =14 ;
-    EmptyMessage LARGE_UTF8 = 32;
-    EmptyMessage BINARY =15 ;
-    int32 FIXED_SIZE_BINARY =16 ;
-    EmptyMessage LARGE_BINARY = 31;
-    EmptyMessage DATE32 =17 ;
-    EmptyMessage DATE64 =18 ;
-    TimeUnit DURATION = 19;
-    Timestamp TIMESTAMP =20 ;
-    TimeUnit TIME32 =21 ;
-    TimeUnit TIME64 =22 ;
-    IntervalUnit INTERVAL =23 ;
-    Decimal DECIMAL =24 ;
-    List LIST =25;
-    List LARGE_LIST = 26;
-    FixedSizeList FIXED_SIZE_LIST = 27;
-    Struct STRUCT =28;
-    Union UNION =29;
-    Dictionary DICTIONARY =30;
-  }
+    oneof arrow_type_enum {
+        EmptyMessage NONE = 1;     // arrow::Type::NA
+        EmptyMessage BOOL =  2;     // arrow::Type::BOOL
+        EmptyMessage UINT8 = 3;    // arrow::Type::UINT8
+        EmptyMessage INT8 =  4;     // arrow::Type::INT8
+        EmptyMessage UINT16 =5;   // represents arrow::Type fields in src/arrow/type.h
+        EmptyMessage INT16 = 6;
+        EmptyMessage UINT32 =7;
+        EmptyMessage INT32 = 8;
+        EmptyMessage UINT64 =9;
+        EmptyMessage INT64 =10 ;
+        EmptyMessage FLOAT16 =11 ;
+        EmptyMessage FLOAT32 =12 ;
+        EmptyMessage FLOAT64 =13 ;
+        EmptyMessage UTF8 =14 ;
+        EmptyMessage LARGE_UTF8 = 32;
+        EmptyMessage BINARY =15 ;
+        int32 FIXED_SIZE_BINARY =16 ;
+        EmptyMessage LARGE_BINARY = 31;
+        EmptyMessage DATE32 =17 ;
+        EmptyMessage DATE64 =18 ;
+        TimeUnit DURATION = 19;
+        Timestamp TIMESTAMP =20 ;
+        TimeUnit TIME32 =21 ;
+        TimeUnit TIME64 =22 ;
+        IntervalUnit INTERVAL =23 ;
+        Decimal DECIMAL =24 ;
+        List LIST =25;
+        List LARGE_LIST = 26;
+        FixedSizeList FIXED_SIZE_LIST = 27;
+        Struct STRUCT =28;
+        Union UNION =29;
+        Dictionary DICTIONARY =30;
+    }
 }
 
 //Useful for representing an empty enum variant in rust
@@ -923,4 +891,4 @@ message PlanType {
 message StringifiedPlan {
   PlanType plan_type = 1;
   string plan = 2;
-}
+}
\ No newline at end of file
diff --git a/ballista/core/src/serde/physical_plan/mod.rs b/ballista/core/src/serde/physical_plan/mod.rs
index 610fe5fa..1deab14b 100644
--- a/ballista/core/src/serde/physical_plan/mod.rs
+++ b/ballista/core/src/serde/physical_plan/mod.rs
@@ -1463,8 +1463,9 @@ mod roundtrip_tests {
             JoinType::Left,
             JoinType::Right,
             JoinType::Full,
-            JoinType::Anti,
-            JoinType::Semi,
+            JoinType::LeftAnti,
+            JoinType::LeftSemi,
+            JoinType::RightSemi,
         ] {
             for partition_mode in
                 &[PartitionMode::Partitioned, PartitionMode::CollectLeft]
diff --git a/ballista/executor/Cargo.toml b/ballista/executor/Cargo.toml
index 6db40c6f..a91b7e5e 100644
--- a/ballista/executor/Cargo.toml
+++ b/ballista/executor/Cargo.toml
@@ -42,8 +42,8 @@ ballista-core = { path = "../core", version = "0.9.0" }
 chrono = { version = "0.4", default-features = false }
 configure_me = "0.4.0"
 dashmap = "5.4.0"
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "fa5cd7f30d785c5b3355e425e082a9d5a91bf567" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "fa5cd7f30d785c5b3355e425e082a9d5a91bf567" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "925a96225e2142d8adc53a3323ddf612fffe2007" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "925a96225e2142d8adc53a3323ddf612fffe2007" }
 futures = "0.3"
 hyper = "0.14.4"
 log = "0.4"
diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml
index 82bc79d0..a6e9fe15 100644
--- a/ballista/scheduler/Cargo.toml
+++ b/ballista/scheduler/Cargo.toml
@@ -46,8 +46,8 @@ base64 = { version = "0.13", default-features = false }
 clap = { version = "3", features = ["derive", "cargo"] }
 configure_me = "0.4.0"
 dashmap = "5.4.0"
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "fa5cd7f30d785c5b3355e425e082a9d5a91bf567" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "fa5cd7f30d785c5b3355e425e082a9d5a91bf567" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "925a96225e2142d8adc53a3323ddf612fffe2007" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "925a96225e2142d8adc53a3323ddf612fffe2007" }
 etcd-client = { version = "0.10", optional = true }
 flatbuffers = { version = "22.9.29" }
 futures = "0.3"
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 135a334c..4c0fc235 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -34,8 +34,8 @@ snmalloc = ["snmalloc-rs"]
 
 [dependencies]
 ballista = { path = "../ballista/client", version = "0.9.0" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "fa5cd7f30d785c5b3355e425e082a9d5a91bf567" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "fa5cd7f30d785c5b3355e425e082a9d5a91bf567" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "925a96225e2142d8adc53a3323ddf612fffe2007" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "925a96225e2142d8adc53a3323ddf612fffe2007" }
 env_logger = "0.9"
 futures = "0.3"
 mimalloc = { version = "0.1", optional = true, default-features = false }
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index 0f551528..d01029f4 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -35,7 +35,7 @@ required-features = ["ballista/standalone"]
 
 [dependencies]
 ballista = { path = "../ballista/client", version = "0.9.0" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "fa5cd7f30d785c5b3355e425e082a9d5a91bf567" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "925a96225e2142d8adc53a3323ddf612fffe2007" }
 futures = "0.3"
 num_cpus = "1.13.0"
 prost = "0.11"
diff --git a/python/Cargo.toml b/python/Cargo.toml
index 2c0d67c8..a4d2ef10 100644
--- a/python/Cargo.toml
+++ b/python/Cargo.toml
@@ -36,7 +36,7 @@ default = ["mimalloc"]
 [dependencies]
 async-trait = "0.1"
 ballista = { path = "../ballista/client", version = "0.9.0" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "fa5cd7f30d785c5b3355e425e082a9d5a91bf567", features = ["pyarrow"] }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "925a96225e2142d8adc53a3323ddf612fffe2007", features = ["pyarrow"] }
 futures = "0.3"
 mimalloc = { version = "*", optional = true, default-features = false }
 pyo3 = { version = "~0.17.1", features = ["extension-module", "abi3", "abi3-py37"] }
diff --git a/python/src/dataframe.rs b/python/src/dataframe.rs
index 3ee7680c..645cdebc 100644
--- a/python/src/dataframe.rs
+++ b/python/src/dataframe.rs
@@ -141,8 +141,9 @@ impl PyDataFrame {
             "left" => JoinType::Left,
             "right" => JoinType::Right,
             "full" => JoinType::Full,
-            "semi" => JoinType::Semi,
-            "anti" => JoinType::Anti,
+            "semi" => JoinType::LeftSemi,
+            "anti" => JoinType::LeftAnti,
+            "right_semi" => JoinType::RightSemi,
             how => {
                 return Err(DataFusionError::Common(format!(
                     "The join type {} does not exist or is not implemented",