You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/07/13 19:30:57 UTC
[arrow-ballista] branch master updated: Use latest DataFusion (#86)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new 56ec4dff Use latest DataFusion (#86)
56ec4dff is described below
commit 56ec4dff02ed1b4e801842a66bfe151eaf461fce
Author: yahoNanJing <90...@users.noreply.github.com>
AuthorDate: Thu Jul 14 03:30:52 2022 +0800
Use latest DataFusion (#86)
* Update datafusion dependency to commit d0d5564b8f689a01e542b8c1df829d74d0fab2b0
* Fix inconsistency
* Use latest DataFusion
* Fix tomlfmt
* Fix PR review
Co-authored-by: yangzhong <ya...@ebay.com>
---
Cargo.toml | 2 +-
ballista-cli/Cargo.toml | 4 +-
ballista/rust/client/Cargo.toml | 4 +-
ballista/rust/client/src/context.rs | 14 +-
ballista/rust/core/Cargo.toml | 7 +-
ballista/rust/core/proto/ballista.proto | 223 +--------
ballista/rust/core/proto/datafusion.proto | 516 ++++++++++++++++-----
.../rust/core/src/serde/logical_plan/from_proto.rs | 52 ---
ballista/rust/core/src/serde/logical_plan/mod.rs | 424 -----------------
ballista/rust/core/src/serde/mod.rs | 52 +--
.../core/src/serde/physical_plan/from_proto.rs | 84 ++--
ballista/rust/core/src/serde/physical_plan/mod.rs | 53 ++-
.../rust/core/src/serde/physical_plan/to_proto.rs | 10 +-
ballista/rust/executor/Cargo.toml | 8 +-
ballista/rust/executor/src/execution_loop.rs | 1 +
ballista/rust/executor/src/executor_server.rs | 1 +
ballista/rust/scheduler/Cargo.toml | 5 +-
.../rust/scheduler/src/scheduler_server/grpc.rs | 69 +--
.../rust/scheduler/src/scheduler_server/mod.rs | 57 ++-
benchmarks/Cargo.toml | 4 +-
examples/Cargo.toml | 2 +-
21 files changed, 594 insertions(+), 998 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 9b9e7932..fc4c25ca 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -28,7 +28,7 @@ exclude = ["ballista-cli", "python"]
# cargo build --profile release-lto
[profile.release-lto]
-inherits = "release"
codegen-units = 1
+inherits = "release"
lto = true
diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml
index db0ad6b7..11c2e772 100644
--- a/ballista-cli/Cargo.toml
+++ b/ballista-cli/Cargo.toml
@@ -31,8 +31,8 @@ readme = "README.md"
[dependencies]
ballista = { path = "../ballista/rust/client", version = "0.7.0" }
clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" }
-datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
+datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
dirs = "4.0.0"
env_logger = "0.9"
mimalloc = { version = "0.1", default-features = false }
diff --git a/ballista/rust/client/Cargo.toml b/ballista/rust/client/Cargo.toml
index f20ed712..9cf4eb33 100644
--- a/ballista/rust/client/Cargo.toml
+++ b/ballista/rust/client/Cargo.toml
@@ -31,8 +31,8 @@ rust-version = "1.59"
ballista-core = { path = "../core", version = "0.7.0" }
ballista-executor = { path = "../executor", version = "0.7.0", optional = true }
ballista-scheduler = { path = "../scheduler", version = "0.7.0", optional = true }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
futures = "0.3"
log = "0.4"
parking_lot = "0.12"
diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs
index 09ff9cee..03cd0799 100644
--- a/ballista/rust/client/src/context.rs
+++ b/ballista/rust/client/src/context.rs
@@ -440,6 +440,8 @@ impl BallistaContext {
#[cfg(test)]
mod tests {
+ #[cfg(feature = "standalone")]
+ use datafusion::datasource::listing::ListingTableUrl;
#[tokio::test]
#[cfg(feature = "standalone")]
@@ -591,10 +593,14 @@ mod tests {
target_partitions: x.target_partitions,
};
- let config =
- ListingTableConfig::new(listing_table.table_path().clone())
- .with_schema(Arc::new(Schema::new(vec![])))
- .with_listing_options(error_options);
+ let table_paths = listing_table
+ .table_paths()
+ .iter()
+ .map(|t| ListingTableUrl::parse(t).unwrap())
+ .collect();
+ let config = ListingTableConfig::new_with_multi_paths(table_paths)
+ .with_schema(Arc::new(Schema::new(vec![])))
+ .with_listing_options(error_options);
let error_table = ListingTable::try_new(config).unwrap();
diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml
index a3803ab5..32f1c47e 100644
--- a/ballista/rust/core/Cargo.toml
+++ b/ballista/rust/core/Cargo.toml
@@ -35,17 +35,18 @@ simd = ["datafusion/simd"]
[dependencies]
ahash = { version = "0.7", default-features = false }
-arrow-flight = { version = "16.0.0" }
+arrow-flight = { version = "18.0.0" }
async-trait = "0.1.41"
chrono = { version = "0.4", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
futures = "0.3"
hashbrown = "0.12"
libloading = "0.7.3"
log = "0.4"
+object_store = "0.3.0"
once_cell = "1.9.0"
parking_lot = "0.12"
diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto
index 0af951cf..f899a603 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -21,7 +21,7 @@ syntax = "proto3";
package ballista.protobuf;
option java_multiple_files = true;
-option java_package = "org.ballistacompute.protobuf";
+option java_package = "org.apache.arrow.ballista.protobuf";
option java_outer_classname = "BallistaProto";
import "datafusion.proto";
@@ -30,43 +30,6 @@ import "datafusion.proto";
// Ballista Logical Plan
///////////////////////////////////////////////////////////////////////////////////////////////////
-// LogicalPlan is a nested type
-message LogicalPlanNode {
- oneof LogicalPlanType {
- ListingTableScanNode listing_scan = 1;
- ProjectionNode projection = 3;
- SelectionNode selection = 4;
- LimitNode limit = 5;
- AggregateNode aggregate = 6;
- JoinNode join = 7;
- SortNode sort = 8;
- RepartitionNode repartition = 9;
- EmptyRelationNode empty_relation = 10;
- CreateExternalTableNode create_external_table = 11;
- ExplainNode explain = 12;
- WindowNode window = 13;
- AnalyzeNode analyze = 14;
- CrossJoinNode cross_join = 15;
- ValuesNode values = 16;
- LogicalExtensionNode extension = 17;
- CreateCatalogSchemaNode create_catalog_schema = 18;
- UnionNode union = 19;
- CreateCatalogNode create_catalog = 20;
- SubqueryAliasNode subquery_alias = 21;
- CreateViewNode create_view = 22;
- OffsetNode offset = 23;
- }
-}
-
-message LogicalExtensionNode {
- bytes node = 1;
- repeated LogicalPlanNode inputs = 2;
-}
-
-message ProjectionColumns {
- repeated string columns = 1;
-}
-
message Statistics {
int64 num_rows = 1;
int64 total_byte_size = 2;
@@ -87,186 +50,6 @@ message PartitionedFile {
FileRange range = 5;
}
-message CsvFormat {
- bool has_header = 1;
- string delimiter = 2;
-}
-
-message ParquetFormat {
- bool enable_pruning = 1;
-}
-
-message AvroFormat {}
-
-message ListingTableScanNode {
- string table_name = 1;
- string path = 2;
- string file_extension = 3;
- ProjectionColumns projection = 4;
- datafusion.Schema schema = 5;
- repeated datafusion.LogicalExprNode filters = 6;
- repeated string table_partition_cols = 7;
- bool collect_stat = 8;
- uint32 target_partitions = 9;
- oneof FileFormatType {
- CsvFormat csv = 10;
- ParquetFormat parquet = 11;
- AvroFormat avro = 12;
- }
-}
-
-message ProjectionNode {
- LogicalPlanNode input = 1;
- repeated datafusion.LogicalExprNode expr = 2;
- oneof optional_alias {
- string alias = 3;
- }
-}
-
-message SelectionNode {
- LogicalPlanNode input = 1;
- datafusion.LogicalExprNode expr = 2;
-}
-
-message SortNode {
- LogicalPlanNode input = 1;
- repeated datafusion.LogicalExprNode expr = 2;
-}
-
-message RepartitionNode {
- LogicalPlanNode input = 1;
- oneof partition_method {
- uint64 round_robin = 2;
- HashRepartition hash = 3;
- }
-}
-
-message HashRepartition {
- repeated datafusion.LogicalExprNode hash_expr = 1;
- uint64 partition_count = 2;
-}
-
-message EmptyRelationNode {
- bool produce_one_row = 1;
-}
-
-message CreateExternalTableNode {
- string name = 1;
- string location = 2;
- FileType file_type = 3;
- bool has_header = 4;
- datafusion.DfSchema schema = 5;
- repeated string table_partition_cols = 6;
- bool if_not_exists = 7;
- string delimiter = 8;
-}
-
-message CreateCatalogSchemaNode {
- string schema_name = 1;
- bool if_not_exists = 2;
- datafusion.DfSchema schema = 3;
-}
-
-message CreateCatalogNode {
- string catalog_name = 1;
- bool if_not_exists = 2;
- datafusion.DfSchema schema = 3;
-}
-
-message CreateViewNode {
- string name = 1;
- LogicalPlanNode input = 2;
- bool or_replace = 3;
-}
-
-// a node containing data for defining values list. unlike in SQL where it's two dimensional, here
-// the list is flattened, and with the field n_cols it can be parsed and partitioned into rows
-message ValuesNode {
- uint64 n_cols = 1;
- repeated datafusion.LogicalExprNode values_list = 2;
-}
-
-enum FileType {
- NdJson = 0;
- Parquet = 1;
- CSV = 2;
- Avro = 3;
-}
-
-message AnalyzeNode {
- LogicalPlanNode input = 1;
- bool verbose = 2;
-}
-
-message ExplainNode {
- LogicalPlanNode input = 1;
- bool verbose = 2;
-}
-
-message AggregateNode {
- LogicalPlanNode input = 1;
- repeated datafusion.LogicalExprNode group_expr = 2;
- repeated datafusion.LogicalExprNode aggr_expr = 3;
-}
-
-message WindowNode {
- LogicalPlanNode input = 1;
- repeated datafusion.LogicalExprNode window_expr = 2;
-}
-
-enum JoinType {
- INNER = 0;
- LEFT = 1;
- RIGHT = 2;
- FULL = 3;
- SEMI = 4;
- ANTI = 5;
-}
-
-enum JoinConstraint {
- ON = 0;
- USING = 1;
-}
-
-message JoinNode {
- LogicalPlanNode left = 1;
- LogicalPlanNode right = 2;
- JoinType join_type = 3;
- JoinConstraint join_constraint = 4;
- repeated datafusion.Column left_join_column = 5;
- repeated datafusion.Column right_join_column = 6;
- bool null_equals_null = 7;
- datafusion.LogicalExprNode filter = 8;
-}
-
-message UnionNode {
- repeated LogicalPlanNode inputs = 1;
-}
-
-message CrossJoinNode {
- LogicalPlanNode left = 1;
- LogicalPlanNode right = 2;
-}
-
-message LimitNode {
- LogicalPlanNode input = 1;
- uint32 limit = 2;
-}
-
-message OffsetNode {
- LogicalPlanNode input = 1;
- uint32 offset = 2;
-}
-
-message SelectionExecNode {
- datafusion.LogicalExprNode expr = 1;
-}
-
-message SubqueryAliasNode {
- LogicalPlanNode input = 1;
- string alias = 2;
-}
-
///////////////////////////////////////////////////////////////////////////////////////////////////
// Ballista Physical Plan
///////////////////////////////////////////////////////////////////////////////////////////////////
@@ -480,7 +263,7 @@ message HashJoinExecNode {
PhysicalPlanNode left = 1;
PhysicalPlanNode right = 2;
repeated JoinOn on = 3;
- JoinType join_type = 4;
+ datafusion.JoinType join_type = 4;
PartitionMode partition_mode = 6;
bool null_equals_null = 7;
JoinFilter filter = 8;
@@ -893,7 +676,7 @@ message GetJobStatusResult {
message GetFileMetadataParams {
string path = 1;
- FileType file_type = 2;
+ datafusion.FileType file_type = 2;
}
message GetFileMetadataResult {
diff --git a/ballista/rust/core/proto/datafusion.proto b/ballista/rust/core/proto/datafusion.proto
index 9999abbf..09c97029 100644
--- a/ballista/rust/core/proto/datafusion.proto
+++ b/ballista/rust/core/proto/datafusion.proto
@@ -21,7 +21,7 @@ syntax = "proto3";
package datafusion;
option java_multiple_files = true;
-option java_package = "org.datafusioncompute.protobuf";
+option java_package = "org.apache.arrow.datafusion.protobuf";
option java_outer_classname = "DatafusionProto";
message ColumnRelation {
@@ -40,6 +40,228 @@ message DfField{
message DfSchema {
repeated DfField columns = 1;
+ map<string, string> metadata = 2;
+}
+
+// logical plan
+// LogicalPlan is a nested type
+message LogicalPlanNode {
+ oneof LogicalPlanType {
+ ListingTableScanNode listing_scan = 1;
+ ProjectionNode projection = 3;
+ SelectionNode selection = 4;
+ LimitNode limit = 5;
+ AggregateNode aggregate = 6;
+ JoinNode join = 7;
+ SortNode sort = 8;
+ RepartitionNode repartition = 9;
+ EmptyRelationNode empty_relation = 10;
+ CreateExternalTableNode create_external_table = 11;
+ ExplainNode explain = 12;
+ WindowNode window = 13;
+ AnalyzeNode analyze = 14;
+ CrossJoinNode cross_join = 15;
+ ValuesNode values = 16;
+ LogicalExtensionNode extension = 17;
+ CreateCatalogSchemaNode create_catalog_schema = 18;
+ UnionNode union = 19;
+ CreateCatalogNode create_catalog = 20;
+ SubqueryAliasNode subquery_alias = 21;
+ CreateViewNode create_view = 22;
+ DistinctNode distinct = 23;
+ }
+}
+
+message LogicalExtensionNode {
+ bytes node = 1;
+ repeated LogicalPlanNode inputs = 2;
+}
+
+message ProjectionColumns {
+ repeated string columns = 1;
+}
+
+message CsvFormat {
+ bool has_header = 1;
+ string delimiter = 2;
+}
+
+message ParquetFormat {
+ bool enable_pruning = 1;
+}
+
+message AvroFormat {}
+
+message ListingTableScanNode {
+ string table_name = 1;
+ repeated string paths = 2;
+ string file_extension = 3;
+ ProjectionColumns projection = 4;
+ datafusion.Schema schema = 5;
+ repeated datafusion.LogicalExprNode filters = 6;
+ repeated string table_partition_cols = 7;
+ bool collect_stat = 8;
+ uint32 target_partitions = 9;
+ oneof FileFormatType {
+ CsvFormat csv = 10;
+ ParquetFormat parquet = 11;
+ AvroFormat avro = 12;
+ }
+}
+
+message ProjectionNode {
+ LogicalPlanNode input = 1;
+ repeated datafusion.LogicalExprNode expr = 2;
+ oneof optional_alias {
+ string alias = 3;
+ }
+}
+
+message SelectionNode {
+ LogicalPlanNode input = 1;
+ datafusion.LogicalExprNode expr = 2;
+}
+
+message SortNode {
+ LogicalPlanNode input = 1;
+ repeated datafusion.LogicalExprNode expr = 2;
+}
+
+message RepartitionNode {
+ LogicalPlanNode input = 1;
+ oneof partition_method {
+ uint64 round_robin = 2;
+ HashRepartition hash = 3;
+ }
+}
+
+message HashRepartition {
+ repeated datafusion.LogicalExprNode hash_expr = 1;
+ uint64 partition_count = 2;
+}
+
+message EmptyRelationNode {
+ bool produce_one_row = 1;
+}
+
+message CreateExternalTableNode {
+ string name = 1;
+ string location = 2;
+ FileType file_type = 3;
+ bool has_header = 4;
+ datafusion.DfSchema schema = 5;
+ repeated string table_partition_cols = 6;
+ bool if_not_exists = 7;
+ string delimiter = 8;
+}
+
+message CreateCatalogSchemaNode {
+ string schema_name = 1;
+ bool if_not_exists = 2;
+ datafusion.DfSchema schema = 3;
+}
+
+message CreateCatalogNode {
+ string catalog_name = 1;
+ bool if_not_exists = 2;
+ datafusion.DfSchema schema = 3;
+}
+
+message CreateViewNode {
+ string name = 1;
+ LogicalPlanNode input = 2;
+ bool or_replace = 3;
+ string definition = 4;
+}
+
+// a node containing data for defining values list. unlike in SQL where it's two dimensional, here
+// the list is flattened, and with the field n_cols it can be parsed and partitioned into rows
+message ValuesNode {
+ uint64 n_cols = 1;
+ repeated datafusion.LogicalExprNode values_list = 2;
+}
+
+enum FileType {
+ NdJson = 0;
+ Parquet = 1;
+ CSV = 2;
+ Avro = 3;
+}
+
+message AnalyzeNode {
+ LogicalPlanNode input = 1;
+ bool verbose = 2;
+}
+
+message ExplainNode {
+ LogicalPlanNode input = 1;
+ bool verbose = 2;
+}
+
+message AggregateNode {
+ LogicalPlanNode input = 1;
+ repeated datafusion.LogicalExprNode group_expr = 2;
+ repeated datafusion.LogicalExprNode aggr_expr = 3;
+}
+
+message WindowNode {
+ LogicalPlanNode input = 1;
+ repeated datafusion.LogicalExprNode window_expr = 2;
+}
+
+enum JoinType {
+ INNER = 0;
+ LEFT = 1;
+ RIGHT = 2;
+ FULL = 3;
+ SEMI = 4;
+ ANTI = 5;
+}
+
+enum JoinConstraint {
+ ON = 0;
+ USING = 1;
+}
+
+message JoinNode {
+ LogicalPlanNode left = 1;
+ LogicalPlanNode right = 2;
+ JoinType join_type = 3;
+ JoinConstraint join_constraint = 4;
+ repeated datafusion.Column left_join_column = 5;
+ repeated datafusion.Column right_join_column = 6;
+ bool null_equals_null = 7;
+ LogicalExprNode filter = 8;
+}
+
+message DistinctNode {
+ LogicalPlanNode input = 1;
+}
+
+message UnionNode {
+ repeated LogicalPlanNode inputs = 1;
+}
+
+message CrossJoinNode {
+ LogicalPlanNode left = 1;
+ LogicalPlanNode right = 2;
+}
+
+message LimitNode {
+ LogicalPlanNode input = 1;
+ // The number of rows to skip before fetch; non-positive means don't skip any
+ int64 skip = 2;
+ // Maximum number of rows to fetch; negative means no limit
+ int64 fetch = 3;
+}
+
+message SelectionExecNode {
+ datafusion.LogicalExprNode expr = 1;
+}
+
+message SubqueryAliasNode {
+ LogicalPlanNode input = 1;
+ string alias = 2;
}
// logical expressions
@@ -76,9 +298,46 @@ message LogicalExprNode {
// window expressions
WindowExprNode window_expr = 18;
+
+ // AggregateUDF expressions
+ AggregateUDFExprNode aggregate_udf_expr = 19;
+
+ // Scalar UDF expressions
+ ScalarUDFExprNode scalar_udf_expr = 20;
+
+ GetIndexedField get_indexed_field = 21;
+
+ GroupingSetNode grouping_set = 22;
+
+ CubeNode cube = 23;
+
+ RollupNode rollup = 24;
}
}
+message LogicalExprList {
+ repeated LogicalExprNode expr = 1;
+}
+
+message GroupingSetNode {
+ repeated LogicalExprList expr = 1;
+}
+
+message CubeNode {
+ repeated LogicalExprNode expr = 1;
+}
+
+message RollupNode {
+ repeated LogicalExprNode expr = 1;
+}
+
+
+
+message GetIndexedField {
+ LogicalExprNode expr = 1;
+ ScalarValue key = 2;
+}
+
message IsNull {
LogicalExprNode expr = 1;
}
@@ -177,6 +436,8 @@ enum ScalarFunction {
Trim=61;
Upper=62;
Coalesce=63;
+ Power=64;
+ StructFun=65;
}
message ScalarFunctionNode {
@@ -202,6 +463,7 @@ enum AggregateFunction {
APPROX_PERCENTILE_CONT = 14;
APPROX_MEDIAN=15;
APPROX_PERCENTILE_CONT_WITH_WEIGHT = 16;
+ GROUPING = 17;
}
message AggregateExprNode {
@@ -209,6 +471,16 @@ message AggregateExprNode {
repeated LogicalExprNode expr = 2;
}
+message AggregateUDFExprNode {
+ string fun_name = 1;
+ repeated LogicalExprNode args = 2;
+}
+
+message ScalarUDFExprNode {
+ string fun_name = 1;
+ repeated LogicalExprNode args = 2;
+}
+
enum BuiltInWindowFunction {
ROW_NUMBER = 0;
RANK = 1;
@@ -321,53 +593,53 @@ message Field {
}
message FixedSizeBinary{
- int32 length = 1;
+ int32 length = 1;
}
message Timestamp{
- TimeUnit time_unit = 1;
- string timezone = 2;
+ TimeUnit time_unit = 1;
+ string timezone = 2;
}
enum DateUnit{
- Day = 0;
- DateMillisecond = 1;
+ Day = 0;
+ DateMillisecond = 1;
}
enum TimeUnit{
- Second = 0;
- TimeMillisecond = 1;
- Microsecond = 2;
- Nanosecond = 3;
+ Second = 0;
+ TimeMillisecond = 1;
+ Microsecond = 2;
+ Nanosecond = 3;
}
enum IntervalUnit{
- YearMonth = 0;
- DayTime = 1;
- MonthDayNano = 2;
+ YearMonth = 0;
+ DayTime = 1;
+ MonthDayNano = 2;
}
message Decimal{
- uint64 whole = 1;
- uint64 fractional = 2;
+ uint64 whole = 1;
+ uint64 fractional = 2;
}
message List{
- Field field_type = 1;
+ Field field_type = 1;
}
message FixedSizeList{
- Field field_type = 1;
- int32 list_size = 2;
+ Field field_type = 1;
+ int32 list_size = 2;
}
message Dictionary{
- ArrowType key = 1;
- ArrowType value = 2;
+ ArrowType key = 1;
+ ArrowType value = 2;
}
message Struct{
- repeated Field sub_field_types = 1;
+ repeated Field sub_field_types = 1;
}
enum UnionMode{
@@ -376,45 +648,53 @@ enum UnionMode{
}
message Union{
- repeated Field union_types = 1;
- UnionMode union_mode = 2;
+ repeated Field union_types = 1;
+ UnionMode union_mode = 2;
+ repeated int32 type_ids = 3;
}
message ScalarListValue{
- ScalarType datatype = 1;
- repeated ScalarValue values = 2;
+ ScalarType datatype = 1;
+ repeated ScalarValue values = 2;
+}
+
+message ScalarTimestampValue {
+ oneof value {
+ int64 time_microsecond_value = 1;
+ int64 time_nanosecond_value = 2;
+ int64 time_second_value = 3;
+ int64 time_millisecond_value = 4;
+ };
+ string timezone = 5;
}
message ScalarValue{
- oneof value {
- bool bool_value = 1;
- string utf8_value = 2;
- string large_utf8_value = 3;
- int32 int8_value = 4;
- int32 int16_value = 5;
- int32 int32_value = 6;
- int64 int64_value = 7;
- uint32 uint8_value = 8;
- uint32 uint16_value = 9;
- uint32 uint32_value = 10;
- uint64 uint64_value = 11;
- float float32_value = 12;
- double float64_value = 13;
- //Literal Date32 value always has a unit of day
- int32 date_32_value = 14;
- int64 time_microsecond_value = 15;
- int64 time_nanosecond_value = 16;
- ScalarListValue list_value = 17;
- ScalarType null_list_value = 18;
-
- PrimitiveScalarType null_value = 19;
- Decimal128 decimal128_value = 20;
- int64 date_64_value = 21;
- int64 time_second_value = 22;
- int64 time_millisecond_value = 23;
- int32 interval_yearmonth_value = 24;
- int64 interval_daytime_value = 25;
- }
+ oneof value {
+ bool bool_value = 1;
+ string utf8_value = 2;
+ string large_utf8_value = 3;
+ int32 int8_value = 4;
+ int32 int16_value = 5;
+ int32 int32_value = 6;
+ int64 int64_value = 7;
+ uint32 uint8_value = 8;
+ uint32 uint16_value = 9;
+ uint32 uint32_value = 10;
+ uint64 uint64_value = 11;
+ float float32_value = 12;
+ double float64_value = 13;
+ //Literal Date32 value always has a unit of day
+ int32 date_32_value = 14;
+ ScalarListValue list_value = 17;
+ ScalarType null_list_value = 18;
+
+ PrimitiveScalarType null_value = 19;
+ Decimal128 decimal128_value = 20;
+ int64 date_64_value = 21;
+ int32 interval_yearmonth_value = 24;
+ int64 interval_daytime_value = 25;
+ ScalarTimestampValue timestamp_value = 26;
+ }
}
message Decimal128{
@@ -427,42 +707,42 @@ message Decimal128{
// List
enum PrimitiveScalarType{
- BOOL = 0; // arrow::Type::BOOL
- UINT8 = 1; // arrow::Type::UINT8
- INT8 = 2; // arrow::Type::INT8
- UINT16 = 3; // represents arrow::Type fields in src/arrow/type.h
- INT16 = 4;
- UINT32 = 5;
- INT32 = 6;
- UINT64 = 7;
- INT64 = 8;
- FLOAT32 = 9;
- FLOAT64 = 10;
- UTF8 = 11;
- LARGE_UTF8 = 12;
- DATE32 = 13;
- TIME_MICROSECOND = 14;
- TIME_NANOSECOND = 15;
- NULL = 16;
-
- DECIMAL128 = 17;
- DATE64 = 20;
- TIME_SECOND = 21;
- TIME_MILLISECOND = 22;
- INTERVAL_YEARMONTH = 23;
- INTERVAL_DAYTIME = 24;
+ BOOL = 0; // arrow::Type::BOOL
+ UINT8 = 1; // arrow::Type::UINT8
+ INT8 = 2; // arrow::Type::INT8
+ UINT16 = 3; // represents arrow::Type fields in src/arrow/type.h
+ INT16 = 4;
+ UINT32 = 5;
+ INT32 = 6;
+ UINT64 = 7;
+ INT64 = 8;
+ FLOAT32 = 9;
+ FLOAT64 = 10;
+ UTF8 = 11;
+ LARGE_UTF8 = 12;
+ DATE32 = 13;
+ TIME_MICROSECOND = 14;
+ TIME_NANOSECOND = 15;
+ NULL = 16;
+
+ DECIMAL128 = 17;
+ DATE64 = 20;
+ TIME_SECOND = 21;
+ TIME_MILLISECOND = 22;
+ INTERVAL_YEARMONTH = 23;
+ INTERVAL_DAYTIME = 24;
}
message ScalarType{
- oneof datatype{
- PrimitiveScalarType scalar = 1;
- ScalarListType list = 2;
- }
+ oneof datatype{
+ PrimitiveScalarType scalar = 1;
+ ScalarListType list = 2;
+ }
}
message ScalarListType{
- repeated string field_names = 3;
- PrimitiveScalarType deepest_type = 2;
+ repeated string field_names = 3;
+ PrimitiveScalarType deepest_type = 2;
}
// Broke out into multiple message types so that type
@@ -470,40 +750,40 @@ message ScalarListType{
//All types that are of the empty message types contain no additional metadata
// about the type
message ArrowType{
- oneof arrow_type_enum{
- EmptyMessage NONE = 1; // arrow::Type::NA
- EmptyMessage BOOL = 2; // arrow::Type::BOOL
- EmptyMessage UINT8 = 3; // arrow::Type::UINT8
- EmptyMessage INT8 = 4; // arrow::Type::INT8
- EmptyMessage UINT16 =5; // represents arrow::Type fields in src/arrow/type.h
- EmptyMessage INT16 = 6;
- EmptyMessage UINT32 =7;
- EmptyMessage INT32 = 8;
- EmptyMessage UINT64 =9;
- EmptyMessage INT64 =10 ;
- EmptyMessage FLOAT16 =11 ;
- EmptyMessage FLOAT32 =12 ;
- EmptyMessage FLOAT64 =13 ;
- EmptyMessage UTF8 =14 ;
- EmptyMessage LARGE_UTF8 = 32;
- EmptyMessage BINARY =15 ;
- int32 FIXED_SIZE_BINARY =16 ;
- EmptyMessage LARGE_BINARY = 31;
- EmptyMessage DATE32 =17 ;
- EmptyMessage DATE64 =18 ;
- TimeUnit DURATION = 19;
- Timestamp TIMESTAMP =20 ;
- TimeUnit TIME32 =21 ;
- TimeUnit TIME64 =22 ;
- IntervalUnit INTERVAL =23 ;
- Decimal DECIMAL =24 ;
- List LIST =25;
- List LARGE_LIST = 26;
- FixedSizeList FIXED_SIZE_LIST = 27;
- Struct STRUCT =28;
- Union UNION =29;
- Dictionary DICTIONARY =30;
- }
+ oneof arrow_type_enum{
+ EmptyMessage NONE = 1; // arrow::Type::NA
+ EmptyMessage BOOL = 2; // arrow::Type::BOOL
+ EmptyMessage UINT8 = 3; // arrow::Type::UINT8
+ EmptyMessage INT8 = 4; // arrow::Type::INT8
+ EmptyMessage UINT16 =5; // represents arrow::Type fields in src/arrow/type.h
+ EmptyMessage INT16 = 6;
+ EmptyMessage UINT32 =7;
+ EmptyMessage INT32 = 8;
+ EmptyMessage UINT64 =9;
+ EmptyMessage INT64 =10 ;
+ EmptyMessage FLOAT16 =11 ;
+ EmptyMessage FLOAT32 =12 ;
+ EmptyMessage FLOAT64 =13 ;
+ EmptyMessage UTF8 =14 ;
+ EmptyMessage LARGE_UTF8 = 32;
+ EmptyMessage BINARY =15 ;
+ int32 FIXED_SIZE_BINARY =16 ;
+ EmptyMessage LARGE_BINARY = 31;
+ EmptyMessage DATE32 =17 ;
+ EmptyMessage DATE64 =18 ;
+ TimeUnit DURATION = 19;
+ Timestamp TIMESTAMP =20 ;
+ TimeUnit TIME32 =21 ;
+ TimeUnit TIME64 =22 ;
+ IntervalUnit INTERVAL =23 ;
+ Decimal DECIMAL =24 ;
+ List LIST =25;
+ List LARGE_LIST = 26;
+ FixedSizeList FIXED_SIZE_LIST = 27;
+ Struct STRUCT =28;
+ Union UNION =29;
+ Dictionary DICTIONARY =30;
+ }
}
//Useful for representing an empty enum variant in rust
diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
deleted file mode 100644
index a17c646c..00000000
--- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs
+++ /dev/null
@@ -1,52 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Serde code to convert from protocol buffers to Rust data structures.
-
-use crate::error::BallistaError;
-use crate::serde::protobuf;
-use std::convert::TryFrom;
-
-impl TryFrom<i32> for protobuf::FileType {
- type Error = BallistaError;
- fn try_from(value: i32) -> Result<Self, Self::Error> {
- use protobuf::FileType;
- match value {
- _x if _x == FileType::NdJson as i32 => Ok(FileType::NdJson),
- _x if _x == FileType::Parquet as i32 => Ok(FileType::Parquet),
- _x if _x == FileType::Csv as i32 => Ok(FileType::Csv),
- _x if _x == FileType::Avro as i32 => Ok(FileType::Avro),
- invalid => Err(BallistaError::General(format!(
- "Attempted to convert invalid i32 to protobuf::Filetype: {}",
- invalid
- ))),
- }
- }
-}
-
-#[allow(clippy::from_over_into)]
-impl Into<datafusion::logical_plan::FileType> for protobuf::FileType {
- fn into(self) -> datafusion::logical_plan::FileType {
- use datafusion::logical_plan::FileType;
- match self {
- protobuf::FileType::NdJson => FileType::NdJson,
- protobuf::FileType::Parquet => FileType::Parquet,
- protobuf::FileType::Csv => FileType::CSV,
- protobuf::FileType::Avro => FileType::Avro,
- }
- }
-}
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs
deleted file mode 100644
index 5952a095..00000000
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ /dev/null
@@ -1,424 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-pub mod from_proto;
-
-#[macro_export]
-macro_rules! into_logical_plan {
- ($PB:expr, $CTX:expr, $CODEC:expr) => {{
- if let Some(field) = $PB.as_ref() {
- field.as_ref().try_into_logical_plan($CTX, $CODEC)
- } else {
- Err(proto_error("Missing required field in protobuf"))
- }
- }};
-}
-
-#[cfg(test)]
-mod roundtrip_tests {
-
- use super::super::{super::error::Result, protobuf};
- use crate::serde::{AsLogicalPlan, BallistaCodec};
- use async_trait::async_trait;
- use core::panic;
- use datafusion::common::DFSchemaRef;
- use datafusion::datasource::listing::ListingTableUrl;
- use datafusion::logical_plan::source_as_provider;
- use datafusion::{
- arrow::datatypes::{DataType, Field, Schema},
- common::ToDFSchema,
- datafusion_data_access::{
- self,
- object_store::{FileMetaStream, ListEntryStream, ObjectReader, ObjectStore},
- SizedFile,
- },
- datasource::listing::ListingTable,
- logical_expr::{
- binary_expr, col,
- logical_plan::{
- CreateExternalTable, FileType, LogicalPlan, LogicalPlanBuilder,
- Repartition,
- },
- Expr, Operator,
- },
- prelude::*,
- };
- use std::io;
- use std::sync::Arc;
-
- #[derive(Debug)]
- struct TestObjectStore {}
-
- #[async_trait]
- impl ObjectStore for TestObjectStore {
- async fn list_file(
- &self,
- _prefix: &str,
- ) -> datafusion_data_access::Result<FileMetaStream> {
- Err(io::Error::new(
- io::ErrorKind::Unsupported,
- "this is only a test object store".to_string(),
- ))
- }
-
- async fn list_dir(
- &self,
- _prefix: &str,
- _delimiter: Option<String>,
- ) -> datafusion_data_access::Result<ListEntryStream> {
- Err(io::Error::new(
- io::ErrorKind::Unsupported,
- "this is only a test object store".to_string(),
- ))
- }
-
- fn file_reader(
- &self,
- _file: SizedFile,
- ) -> datafusion_data_access::Result<Arc<dyn ObjectReader>> {
- Err(io::Error::new(
- io::ErrorKind::Unsupported,
- "this is only a test object store".to_string(),
- ))
- }
- }
-
- // Given a identity of a LogicalPlan converts it to protobuf and back, using debug formatting to test equality.
- macro_rules! roundtrip_test {
- ($initial_struct:ident, $proto_type:ty, $struct_type:ty) => {
- let proto: $proto_type = (&$initial_struct).try_into()?;
-
- let round_trip: $struct_type = (&proto).try_into()?;
-
- assert_eq!(
- format!("{:?}", $initial_struct),
- format!("{:?}", round_trip)
- );
- };
- ($initial_struct:ident, $struct_type:ty) => {
- roundtrip_test!($initial_struct, protobuf::LogicalPlanNode, $struct_type);
- };
- ($initial_struct:ident) => {
- let ctx = SessionContext::new();
- let codec: BallistaCodec<
- datafusion_proto::protobuf::LogicalPlanNode,
- protobuf::PhysicalPlanNode,
- > = BallistaCodec::default();
- let proto: datafusion_proto::protobuf::LogicalPlanNode =
- datafusion_proto::protobuf::LogicalPlanNode::try_from_logical_plan(
- &$initial_struct,
- codec.logical_extension_codec(),
- )
- .expect("from logical plan");
- let round_trip: LogicalPlan = proto
- .try_into_logical_plan(&ctx, codec.logical_extension_codec())
- .expect("to logical plan");
-
- assert_eq!(
- format!("{:?}", $initial_struct),
- format!("{:?}", round_trip)
- );
- };
- ($initial_struct:ident, $ctx:ident) => {
- let codec: BallistaCodec<
- protobuf::LogicalPlanNode,
- protobuf::PhysicalPlanNode,
- > = BallistaCodec::default();
- let proto: datafusion_proto::protobuf::LogicalPlanNode =
- protobuf::LogicalPlanNode::try_from_logical_plan(&$initial_struct)
- .expect("from logical plan");
- let round_trip: LogicalPlan = proto
- .try_into_logical_plan(&$ctx, codec.logical_extension_codec())
- .expect("to logical plan");
-
- assert_eq!(
- format!("{:?}", $initial_struct),
- format!("{:?}", round_trip)
- );
- };
- }
-
- #[tokio::test]
- async fn roundtrip_repartition() -> Result<()> {
- use datafusion::logical_plan::Partitioning;
-
- let test_partition_counts = [usize::MIN, usize::MAX, 43256];
-
- let test_expr: Vec<Expr> =
- vec![col("c1") + col("c2"), Expr::Literal((4.0).into())];
-
- let plan = std::sync::Arc::new(
- test_scan_csv("employee", Some(vec![3, 4]))
- .await?
- .sort(vec![col("salary")])?
- .build()?,
- );
-
- for partition_count in test_partition_counts.iter() {
- let rr_repartition = Partitioning::RoundRobinBatch(*partition_count);
-
- let roundtrip_plan = LogicalPlan::Repartition(Repartition {
- input: plan.clone(),
- partitioning_scheme: rr_repartition,
- });
-
- roundtrip_test!(roundtrip_plan);
-
- let h_repartition = Partitioning::Hash(test_expr.clone(), *partition_count);
-
- let roundtrip_plan = LogicalPlan::Repartition(Repartition {
- input: plan.clone(),
- partitioning_scheme: h_repartition,
- });
-
- roundtrip_test!(roundtrip_plan);
-
- let no_expr_hrepartition = Partitioning::Hash(Vec::new(), *partition_count);
-
- let roundtrip_plan = LogicalPlan::Repartition(Repartition {
- input: plan.clone(),
- partitioning_scheme: no_expr_hrepartition,
- });
-
- roundtrip_test!(roundtrip_plan);
- }
-
- Ok(())
- }
-
- #[test]
- fn roundtrip_create_external_table() -> Result<()> {
- let schema = test_schema();
-
- let df_schema_ref = schema.to_dfschema_ref()?;
-
- let filetypes: [FileType; 4] = [
- FileType::NdJson,
- FileType::Parquet,
- FileType::CSV,
- FileType::Avro,
- ];
-
- for file in filetypes.iter() {
- let create_table_node =
- LogicalPlan::CreateExternalTable(CreateExternalTable {
- schema: df_schema_ref.clone(),
- name: String::from("TestName"),
- location: String::from("employee.csv"),
- file_type: *file,
- has_header: true,
- delimiter: ',',
- table_partition_cols: vec![],
- if_not_exists: false,
- });
-
- roundtrip_test!(create_table_node);
- }
-
- Ok(())
- }
-
- #[tokio::test]
- async fn roundtrip_analyze() -> Result<()> {
- let verbose_plan = test_scan_csv("employee", Some(vec![3, 4]))
- .await?
- .sort(vec![col("salary")])?
- .explain(true, true)?
- .build()?;
-
- let plan = test_scan_csv("employee", Some(vec![3, 4]))
- .await?
- .sort(vec![col("salary")])?
- .explain(false, true)?
- .build()?;
-
- roundtrip_test!(plan);
-
- roundtrip_test!(verbose_plan);
-
- Ok(())
- }
-
- #[tokio::test]
- async fn roundtrip_explain() -> Result<()> {
- let verbose_plan = test_scan_csv("employee", Some(vec![3, 4]))
- .await?
- .sort(vec![col("salary")])?
- .explain(true, false)?
- .build()?;
-
- let plan = test_scan_csv("employee", Some(vec![3, 4]))
- .await?
- .sort(vec![col("salary")])?
- .explain(false, false)?
- .build()?;
-
- roundtrip_test!(plan);
-
- roundtrip_test!(verbose_plan);
-
- Ok(())
- }
-
- #[tokio::test]
- async fn roundtrip_join() -> Result<()> {
- let scan_plan = test_scan_csv("employee1", Some(vec![0, 3, 4]))
- .await?
- .build()?;
- let filter = binary_expr(col("employee1.x"), Operator::Gt, col("employee2.y"));
-
- let plan = test_scan_csv("employee2", Some(vec![0, 3, 4]))
- .await?
- .join(
- &scan_plan,
- JoinType::Inner,
- (vec!["id"], vec!["id"]),
- Some(filter),
- )?
- .build()?;
-
- roundtrip_test!(plan);
- Ok(())
- }
-
- #[tokio::test]
- async fn roundtrip_sort() -> Result<()> {
- let plan = test_scan_csv("employee", Some(vec![3, 4]))
- .await?
- .sort(vec![col("salary")])?
- .build()?;
- roundtrip_test!(plan);
-
- Ok(())
- }
-
- #[tokio::test]
- async fn roundtrip_empty_relation() -> Result<()> {
- let plan_false = LogicalPlanBuilder::empty(false).build()?;
-
- roundtrip_test!(plan_false);
-
- let plan_true = LogicalPlanBuilder::empty(true).build()?;
-
- roundtrip_test!(plan_true);
-
- Ok(())
- }
-
- #[tokio::test]
- async fn roundtrip_logical_plan() -> Result<()> {
- let plan = test_scan_csv("employee", Some(vec![3, 4]))
- .await?
- .aggregate(vec![col("state")], vec![max(col("salary"))])?
- .build()?;
-
- roundtrip_test!(plan);
-
- Ok(())
- }
-
- #[ignore] // see https://github.com/apache/arrow-datafusion/issues/2546
- #[tokio::test]
- async fn roundtrip_logical_plan_custom_ctx() -> Result<()> {
- let ctx = SessionContext::new();
- let codec: BallistaCodec<
- datafusion_proto::protobuf::LogicalPlanNode,
- protobuf::PhysicalPlanNode,
- > = BallistaCodec::default();
- let custom_object_store = Arc::new(TestObjectStore {});
- ctx.runtime_env()
- .register_object_store("test", custom_object_store.clone());
-
- let table_path = "test:///employee.csv";
- let url = ListingTableUrl::parse(table_path).unwrap();
- let os = ctx.runtime_env().object_store(&url)?;
- assert_eq!("TestObjectStore", &format!("{:?}", os));
- assert_eq!(table_path, &url.to_string());
-
- let schema = test_schema();
- let plan = ctx
- .read_csv(
- table_path,
- CsvReadOptions::new().schema(&schema).has_header(true),
- )
- .await?
- .to_logical_plan()?;
-
- let proto: datafusion_proto::protobuf::LogicalPlanNode =
- datafusion_proto::protobuf::LogicalPlanNode::try_from_logical_plan(
- &plan,
- codec.logical_extension_codec(),
- )
- .expect("from logical plan");
- let round_trip: LogicalPlan = proto
- .try_into_logical_plan(&ctx, codec.logical_extension_codec())
- .expect("to logical plan");
-
- assert_eq!(format!("{:?}", plan), format!("{:?}", round_trip));
-
- let table_path = match round_trip {
- LogicalPlan::TableScan(scan) => {
- let source = source_as_provider(&scan.source)?;
- match source.as_ref().as_any().downcast_ref::<ListingTable>() {
- Some(listing_table) => listing_table.table_path().clone(),
- _ => panic!("expected a ListingTable"),
- }
- }
- _ => panic!("expected a TableScan"),
- };
-
- assert_eq!(table_path.as_str(), url.as_str());
-
- Ok(())
- }
-
- fn test_schema() -> Schema {
- Schema::new(vec![
- Field::new("id", DataType::Int32, false),
- Field::new("first_name", DataType::Utf8, false),
- Field::new("last_name", DataType::Utf8, false),
- Field::new("state", DataType::Utf8, false),
- Field::new("salary", DataType::Int32, false),
- ])
- }
-
- async fn test_scan_csv(
- table_name: &str,
- projection: Option<Vec<usize>>,
- ) -> Result<LogicalPlanBuilder> {
- let schema = test_schema();
- let ctx = SessionContext::new();
- let options = CsvReadOptions::new().schema(&schema);
-
- let uri = format!("file:///{}.csv", table_name);
- ctx.register_csv(table_name, &uri, options).await?;
-
- let df = ctx.table(table_name)?;
- let plan = match df.to_logical_plan()? {
- LogicalPlan::TableScan(ref scan) => {
- let mut scan = scan.clone();
- scan.projection = projection;
- let mut projected_schema = scan.projected_schema.as_ref().clone();
- projected_schema = projected_schema.replace_qualifier(table_name);
- scan.projected_schema = DFSchemaRef::new(projected_schema);
- LogicalPlan::TableScan(scan)
- }
- _ => unimplemented!(),
- };
- Ok(LogicalPlanBuilder::from(plan))
- }
-}
diff --git a/ballista/rust/core/src/serde/mod.rs b/ballista/rust/core/src/serde/mod.rs
index a955c099..20979f12 100644
--- a/ballista/rust/core/src/serde/mod.rs
+++ b/ballista/rust/core/src/serde/mod.rs
@@ -20,7 +20,7 @@
use crate::{error::BallistaError, serde::scheduler::Action as BallistaAction};
use datafusion::execution::runtime_env::RuntimeEnv;
-use datafusion::logical_plan::{FunctionRegistry, JoinConstraint, JoinType, Operator};
+use datafusion::logical_plan::{FunctionRegistry, Operator};
use datafusion::physical_plan::join_utils::JoinSide;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_proto::logical_plan::{
@@ -39,7 +39,6 @@ pub mod protobuf {
include!(concat!(env!("OUT_DIR"), "/ballista.protobuf.rs"));
}
-pub mod logical_plan;
pub mod physical_plan;
pub mod scheduler;
@@ -221,32 +220,6 @@ pub(crate) fn from_proto_binary_op(op: &str) -> Result<Operator, BallistaError>
}
}
-impl From<protobuf::JoinType> for JoinType {
- fn from(t: protobuf::JoinType) -> Self {
- match t {
- protobuf::JoinType::Inner => JoinType::Inner,
- protobuf::JoinType::Left => JoinType::Left,
- protobuf::JoinType::Right => JoinType::Right,
- protobuf::JoinType::Full => JoinType::Full,
- protobuf::JoinType::Semi => JoinType::Semi,
- protobuf::JoinType::Anti => JoinType::Anti,
- }
- }
-}
-
-impl From<JoinType> for protobuf::JoinType {
- fn from(t: JoinType) -> Self {
- match t {
- JoinType::Inner => protobuf::JoinType::Inner,
- JoinType::Left => protobuf::JoinType::Left,
- JoinType::Right => protobuf::JoinType::Right,
- JoinType::Full => protobuf::JoinType::Full,
- JoinType::Semi => protobuf::JoinType::Semi,
- JoinType::Anti => protobuf::JoinType::Anti,
- }
- }
-}
-
impl From<protobuf::JoinSide> for JoinSide {
fn from(t: protobuf::JoinSide) -> Self {
match t {
@@ -265,24 +238,6 @@ impl From<JoinSide> for protobuf::JoinSide {
}
}
-impl From<protobuf::JoinConstraint> for JoinConstraint {
- fn from(t: protobuf::JoinConstraint) -> Self {
- match t {
- protobuf::JoinConstraint::On => JoinConstraint::On,
- protobuf::JoinConstraint::Using => JoinConstraint::Using,
- }
- }
-}
-
-impl From<JoinConstraint> for protobuf::JoinConstraint {
- fn from(t: JoinConstraint) -> Self {
- match t {
- JoinConstraint::On => protobuf::JoinConstraint::On,
- JoinConstraint::Using => protobuf::JoinConstraint::Using,
- }
- }
-}
-
fn byte_to_string(b: u8) -> Result<String, BallistaError> {
let b = &[b];
let b = std::str::from_utf8(b)
@@ -397,7 +352,7 @@ mod tests {
&self,
exprs: &[Expr],
inputs: &[LogicalPlan],
- ) -> Arc<dyn UserDefinedLogicalNode + Send + Sync> {
+ ) -> Arc<dyn UserDefinedLogicalNode> {
assert_eq!(inputs.len(), 1, "input size inconsistent");
assert_eq!(exprs.len(), 1, "expression size inconsistent");
Arc::new(TopKPlanNode {
@@ -494,9 +449,10 @@ mod tests {
struct TopKPlanner {}
+ #[async_trait]
impl ExtensionPlanner for TopKPlanner {
/// Create a physical plan for an extension node
- fn plan_extension(
+ async fn plan_extension(
&self,
_planner: &dyn PhysicalPlanner,
node: &dyn UserDefinedLogicalNode,
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index d699a011..6268ab28 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -21,13 +21,8 @@ use std::convert::{TryFrom, TryInto};
use std::ops::Deref;
use std::sync::Arc;
-use crate::error::BallistaError;
-
-use crate::convert_required;
-use crate::serde::{from_proto_binary_op, proto_error, protobuf};
use chrono::{TimeZone, Utc};
-
-use datafusion::datafusion_data_access::{FileMeta, SizedFile};
+use datafusion::arrow::datatypes::Schema;
use datafusion::datasource::listing::{FileRange, PartitionedFile};
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::execution::context::ExecutionProps;
@@ -43,9 +38,15 @@ use datafusion::physical_plan::{
functions, Partitioning,
};
use datafusion::physical_plan::{ColumnStatistics, PhysicalExpr, Statistics};
+use object_store::path::Path;
+use object_store::ObjectMeta;
use protobuf::physical_expr_node::ExprType;
+use crate::convert_required;
+use crate::error::BallistaError;
+use crate::serde::{from_proto_binary_op, proto_error, protobuf};
+
impl From<&protobuf::PhysicalColumn> for Column {
fn from(c: &protobuf::PhysicalColumn) -> Column {
Column::new(&c.name, c.index as usize)
@@ -55,6 +56,7 @@ impl From<&protobuf::PhysicalColumn> for Column {
pub(crate) fn parse_physical_expr(
proto: &protobuf::PhysicalExprNode,
registry: &dyn FunctionRegistry,
+ input_schema: &Schema,
) -> Result<Arc<dyn PhysicalExpr>, BallistaError> {
let expr_type = proto
.expr_type
@@ -70,9 +72,19 @@ pub(crate) fn parse_physical_expr(
Arc::new(Literal::new(convert_required!(scalar.value)?))
}
ExprType::BinaryExpr(binary_expr) => Arc::new(BinaryExpr::new(
- parse_required_physical_box_expr(&binary_expr.l, registry, "left")?,
+ parse_required_physical_box_expr(
+ &binary_expr.l,
+ registry,
+ "left",
+ input_schema,
+ )?,
from_proto_binary_op(&binary_expr.op)?,
- parse_required_physical_box_expr(&binary_expr.r, registry, "right")?,
+ parse_required_physical_box_expr(
+ &binary_expr.r,
+ registry,
+ "right",
+ input_schema,
+ )?,
)),
ExprType::AggregateExpr(_) => {
return Err(BallistaError::General(
@@ -90,29 +102,33 @@ pub(crate) fn parse_physical_expr(
));
}
ExprType::IsNullExpr(e) => Arc::new(IsNullExpr::new(
- parse_required_physical_box_expr(&e.expr, registry, "expr")?,
+ parse_required_physical_box_expr(&e.expr, registry, "expr", input_schema)?,
)),
ExprType::IsNotNullExpr(e) => Arc::new(IsNotNullExpr::new(
- parse_required_physical_box_expr(&e.expr, registry, "expr")?,
+ parse_required_physical_box_expr(&e.expr, registry, "expr", input_schema)?,
)),
ExprType::NotExpr(e) => Arc::new(NotExpr::new(parse_required_physical_box_expr(
- &e.expr, registry, "expr",
+ &e.expr,
+ registry,
+ "expr",
+ input_schema,
)?)),
ExprType::Negative(e) => Arc::new(NegativeExpr::new(
- parse_required_physical_box_expr(&e.expr, registry, "expr")?,
+ parse_required_physical_box_expr(&e.expr, registry, "expr", input_schema)?,
)),
ExprType::InList(e) => Arc::new(InListExpr::new(
- parse_required_physical_box_expr(&e.expr, registry, "expr")?,
+ parse_required_physical_box_expr(&e.expr, registry, "expr", input_schema)?,
e.list
.iter()
- .map(|x| parse_physical_expr(x, registry))
+ .map(|x| parse_physical_expr(x, registry, input_schema))
.collect::<Result<Vec<_>, _>>()?,
e.negated,
+ input_schema,
)),
ExprType::Case(e) => Arc::new(CaseExpr::try_new(
e.expr
.as_ref()
- .map(|e| parse_physical_expr(e.as_ref(), registry))
+ .map(|e| parse_physical_expr(e.as_ref(), registry, input_schema))
.transpose()?,
e.when_then_expr
.iter()
@@ -122,28 +138,29 @@ pub(crate) fn parse_physical_expr(
&e.when_expr,
registry,
"when_expr",
+ input_schema,
)?,
parse_required_physical_expr(
&e.then_expr,
registry,
"then_expr",
+ input_schema,
)?,
))
})
- .collect::<Result<Vec<_>, BallistaError>>()?
- .as_slice(),
+ .collect::<Result<Vec<_>, BallistaError>>()?,
e.else_expr
.as_ref()
- .map(|e| parse_physical_expr(e.as_ref(), registry))
+ .map(|e| parse_physical_expr(e.as_ref(), registry, input_schema))
.transpose()?,
)?),
ExprType::Cast(e) => Arc::new(CastExpr::new(
- parse_required_physical_box_expr(&e.expr, registry, "expr")?,
+ parse_required_physical_box_expr(&e.expr, registry, "expr", input_schema)?,
convert_required!(e.arrow_type)?,
DEFAULT_DATAFUSION_CAST_OPTIONS,
)),
ExprType::TryCast(e) => Arc::new(TryCastExpr::new(
- parse_required_physical_box_expr(&e.expr, registry, "expr")?,
+ parse_required_physical_box_expr(&e.expr, registry, "expr", input_schema)?,
convert_required!(e.arrow_type)?,
)),
ExprType::ScalarFunction(e) => {
@@ -157,7 +174,7 @@ pub(crate) fn parse_physical_expr(
let args = e
.args
.iter()
- .map(|x| parse_physical_expr(x, registry))
+ .map(|x| parse_physical_expr(x, registry, input_schema))
.collect::<Result<Vec<_>, _>>()?;
// TODO Do not create new the ExecutionProps
@@ -181,7 +198,7 @@ pub(crate) fn parse_physical_expr(
let args = e
.args
.iter()
- .map(|x| parse_physical_expr(x, registry))
+ .map(|x| parse_physical_expr(x, registry, input_schema))
.collect::<Result<Vec<_>, _>>()?;
Arc::new(ScalarFunctionExpr::new(
@@ -200,9 +217,10 @@ fn parse_required_physical_box_expr(
expr: &Option<Box<protobuf::PhysicalExprNode>>,
registry: &dyn FunctionRegistry,
field: &str,
+ input_schema: &Schema,
) -> Result<Arc<dyn PhysicalExpr>, BallistaError> {
expr.as_ref()
- .map(|e| parse_physical_expr(e.as_ref(), registry))
+ .map(|e| parse_physical_expr(e.as_ref(), registry, input_schema))
.transpose()?
.ok_or_else(|| {
BallistaError::General(format!("Missing required field {:?}", field))
@@ -213,9 +231,10 @@ fn parse_required_physical_expr(
expr: &Option<protobuf::PhysicalExprNode>,
registry: &dyn FunctionRegistry,
field: &str,
+ input_schema: &Schema,
) -> Result<Arc<dyn PhysicalExpr>, BallistaError> {
expr.as_ref()
- .map(|e| parse_physical_expr(e, registry))
+ .map(|e| parse_physical_expr(e, registry, input_schema))
.transpose()?
.ok_or_else(|| {
BallistaError::General(format!("Missing required field {:?}", field))
@@ -258,13 +277,14 @@ impl TryFrom<&protobuf::physical_window_expr_node::WindowFunction> for WindowFun
pub fn parse_protobuf_hash_partitioning(
partitioning: Option<&protobuf::PhysicalHashRepartition>,
registry: &dyn FunctionRegistry,
+ input_schema: &Schema,
) -> Result<Option<Partitioning>, BallistaError> {
match partitioning {
Some(hash_part) => {
let expr = hash_part
.hash_expr
.iter()
- .map(|e| parse_physical_expr(e, registry))
+ .map(|e| parse_physical_expr(e, registry, input_schema))
.collect::<Result<Vec<Arc<dyn PhysicalExpr>>, _>>()?;
Ok(Some(Partitioning::Hash(
@@ -281,16 +301,10 @@ impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile {
fn try_from(val: &protobuf::PartitionedFile) -> Result<Self, Self::Error> {
Ok(PartitionedFile {
- file_meta: FileMeta {
- sized_file: SizedFile {
- path: val.path.clone(),
- size: val.size,
- },
- last_modified: if val.last_modified_ns == 0 {
- None
- } else {
- Some(Utc.timestamp_nanos(val.last_modified_ns as i64))
- },
+ object_meta: ObjectMeta {
+ location: Path::from(val.path.as_str()),
+ last_modified: Utc.timestamp_nanos(val.last_modified_ns as i64),
+ size: val.size as usize,
},
partition_values: val
.partition_values
diff --git a/ballista/rust/core/src/serde/physical_plan/mod.rs b/ballista/rust/core/src/serde/physical_plan/mod.rs
index ac3cb22e..b80e64b1 100644
--- a/ballista/rust/core/src/serde/physical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/physical_plan/mod.rs
@@ -127,7 +127,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
.expr
.iter()
.zip(projection.expr_name.iter())
- .map(|(expr, name)| Ok((parse_physical_expr(expr,registry)?, name.to_string())))
+ .map(|(expr, name)| Ok((parse_physical_expr(expr,registry, input.schema().as_ref())?, name.to_string())))
.collect::<Result<Vec<(Arc<dyn PhysicalExpr>, String)>, BallistaError>>(
)?;
Ok(Arc::new(ProjectionExec::try_new(exprs, input)?))
@@ -142,7 +142,9 @@ impl AsExecutionPlan for PhysicalPlanNode {
let predicate = filter
.expr
.as_ref()
- .map(|expr| parse_physical_expr(expr, registry))
+ .map(|expr| {
+ parse_physical_expr(expr, registry, input.schema().as_ref())
+ })
.transpose()?
.ok_or_else(|| {
BallistaError::General(
@@ -200,7 +202,9 @@ impl AsExecutionPlan for PhysicalPlanNode {
let expr = hash_part
.hash_expr
.iter()
- .map(|e| parse_physical_expr(e, registry))
+ .map(|e| {
+ parse_physical_expr(e, registry, input.schema().as_ref())
+ })
.collect::<Result<Vec<Arc<dyn PhysicalExpr>>, _>>()?;
Ok(Arc::new(RepartitionExec::try_new(
@@ -285,7 +289,13 @@ impl AsExecutionPlan for PhysicalPlanNode {
let window_node_expr = window_node
.expr
.as_ref()
- .map(|e| parse_physical_expr(e.as_ref(), registry))
+ .map(|e| {
+ parse_physical_expr(
+ e.as_ref(),
+ registry,
+ &physical_schema,
+ )
+ })
.transpose()?
.ok_or_else(|| {
proto_error(
@@ -347,7 +357,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
.iter()
.zip(hash_agg.group_expr_name.iter())
.map(|(expr, name)| {
- parse_physical_expr(expr, registry)
+ parse_physical_expr(expr, registry, input.schema().as_ref())
.map(|expr| (expr, name.to_string()))
})
.collect::<Result<Vec<_>, _>>()?;
@@ -357,7 +367,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
.iter()
.zip(hash_agg.group_expr_name.iter())
.map(|(expr, name)| {
- parse_physical_expr(expr, registry)
+ parse_physical_expr(expr, registry, input.schema().as_ref())
.map(|expr| (expr, name.to_string()))
})
.collect::<Result<Vec<_>, _>>()?;
@@ -409,7 +419,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
)?;
let input_phy_expr: Vec<Arc<dyn PhysicalExpr>> = agg_node.expr.iter()
- .map(|e| parse_physical_expr(e, registry).unwrap()).collect();
+ .map(|e| parse_physical_expr(e, registry, &physical_schema).unwrap()).collect();
Ok(create_aggregate_expr(
&aggr_function.into(),
@@ -457,22 +467,29 @@ impl AsExecutionPlan for PhysicalPlanNode {
Ok((left, right))
})
.collect::<Result<_, BallistaError>>()?;
- let join_type = protobuf::JoinType::from_i32(hashjoin.join_type)
- .ok_or_else(|| {
- proto_error(format!(
+ let join_type =
+ datafusion_proto::protobuf::JoinType::from_i32(hashjoin.join_type)
+ .ok_or_else(|| {
+ proto_error(format!(
"Received a HashJoinNode message with unknown JoinType {}",
hashjoin.join_type
))
- })?;
+ })?;
let filter = hashjoin
.filter
.as_ref()
.map(|f| {
+ let schema = f
+ .schema
+ .as_ref()
+ .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
+ .try_into()?;
+
let expression = parse_physical_expr(
f.expression.as_ref().ok_or_else(|| {
proto_error("Unexpected empty filter expression")
})?,
- registry,
+ registry, &schema
)?;
let column_indices = f.column_indices
.iter()
@@ -489,11 +506,6 @@ impl AsExecutionPlan for PhysicalPlanNode {
})
})
.collect::<Result<Vec<_>, BallistaError>>()?;
- let schema = f
- .schema
- .as_ref()
- .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
- .try_into()?;
Ok(JoinFilter::new(expression, column_indices, schema))
})
@@ -558,6 +570,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
let output_partitioning = parse_protobuf_hash_partitioning(
shuffle_writer.output_partitioning.as_ref(),
registry,
+ input.schema().as_ref(),
)?;
Ok(Arc::new(ShuffleWriterExec::try_new(
@@ -613,7 +626,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
})?
.as_ref();
Ok(PhysicalSortExpr {
- expr: parse_physical_expr(expr,registry)?,
+ expr: parse_physical_expr(expr,registry, input.schema().as_ref())?,
options: SortOptions {
descending: !sort_expr.asc,
nulls_first: sort_expr.nulls_first,
@@ -766,7 +779,8 @@ impl AsExecutionPlan for PhysicalPlanNode {
}),
})
.collect();
- let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
+ let join_type: datafusion_proto::protobuf::JoinType =
+ exec.join_type().to_owned().into();
let filter = exec
.filter()
.as_ref()
@@ -1372,6 +1386,7 @@ mod roundtrip_tests {
lit(ScalarValue::Int64(Some(2))),
],
false,
+ schema.as_ref(),
));
let and = binary(not, Operator::And, in_list, &schema)?;
roundtrip_test(Arc::new(FilterExec::try_new(
diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
index 7896ddde..16195377 100644
--- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
@@ -349,13 +349,9 @@ impl TryFrom<&PartitionedFile> for protobuf::PartitionedFile {
fn try_from(pf: &PartitionedFile) -> Result<Self, Self::Error> {
Ok(protobuf::PartitionedFile {
- path: pf.file_meta.path().to_owned(),
- size: pf.file_meta.size(),
- last_modified_ns: pf
- .file_meta
- .last_modified
- .map(|ts| ts.timestamp_nanos() as u64)
- .unwrap_or(0),
+ path: pf.object_meta.location.as_ref().to_owned(),
+ size: pf.object_meta.size as u64,
+ last_modified_ns: pf.object_meta.last_modified.timestamp_nanos() as u64,
partition_values: pf
.partition_values
.iter()
diff --git a/ballista/rust/executor/Cargo.toml b/ballista/rust/executor/Cargo.toml
index 9b32fb8b..d05138ce 100644
--- a/ballista/rust/executor/Cargo.toml
+++ b/ballista/rust/executor/Cargo.toml
@@ -34,14 +34,14 @@ snmalloc = ["snmalloc-rs"]
[dependencies]
anyhow = "1"
-arrow = { version = "16.0.0" }
-arrow-flight = { version = "16.0.0" }
+arrow = { version = "18.0.0" }
+arrow-flight = { version = "18.0.0" }
async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.7.0" }
chrono = { version = "0.4", default-features = false }
configure_me = "0.4.0"
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
env_logger = "0.9"
futures = "0.3"
hyper = "0.14.4"
diff --git a/ballista/rust/executor/src/execution_loop.rs b/ballista/rust/executor/src/execution_loop.rs
index 1e382b02..0520ab65 100644
--- a/ballista/rust/executor/src/execution_loop.rs
+++ b/ballista/rust/executor/src/execution_loop.rs
@@ -178,6 +178,7 @@ async fn run_received_tasks<T: 'static + AsLogicalPlan, U: 'static + AsExecution
let shuffle_output_partitioning = parse_protobuf_hash_partitioning(
task.output_partitioning.as_ref(),
task_context.as_ref(),
+ plan.schema().as_ref(),
)?;
tokio::spawn(async move {
diff --git a/ballista/rust/executor/src/executor_server.rs b/ballista/rust/executor/src/executor_server.rs
index de202a54..158c708a 100644
--- a/ballista/rust/executor/src/executor_server.rs
+++ b/ballista/rust/executor/src/executor_server.rs
@@ -220,6 +220,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
let shuffle_output_partitioning = parse_protobuf_hash_partitioning(
task.output_partitioning.as_ref(),
task_context.as_ref(),
+ plan.schema().as_ref(),
)?;
let execution_result = self
diff --git a/ballista/rust/scheduler/Cargo.toml b/ballista/rust/scheduler/Cargo.toml
index 5da74bad..8ff40930 100644
--- a/ballista/rust/scheduler/Cargo.toml
+++ b/ballista/rust/scheduler/Cargo.toml
@@ -41,8 +41,8 @@ async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.7.0" }
clap = { version = "3", features = ["derive", "cargo"] }
configure_me = "0.4.0"
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
env_logger = "0.9"
etcd-client = { version = "0.9", optional = true }
futures = "0.3"
@@ -50,6 +50,7 @@ http = "0.2"
http-body = "0.4"
hyper = "0.14.4"
log = "0.4"
+object_store = "0.3.0"
parking_lot = "0.12"
parse_arg = "0.1.3"
prost = "0.10"
diff --git a/ballista/rust/scheduler/src/scheduler_server/grpc.rs b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
index 01c2058b..0cea1be4 100644
--- a/ballista/rust/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
@@ -15,12 +15,25 @@
// specific language governing permissions and limitations
// under the License.
-use crate::scheduler_server::event::QueryStageSchedulerEvent;
-use crate::scheduler_server::{
- create_datafusion_context, update_datafusion_context, SchedulerServer,
-};
-use crate::state::task_scheduler::TaskScheduler;
+use std::convert::TryInto;
+use std::ops::Deref;
+use std::sync::Arc;
+use std::time::Instant;
+use std::time::{SystemTime, UNIX_EPOCH};
+
use anyhow::Context;
+use datafusion::datasource::file_format::parquet::ParquetFormat;
+use datafusion::datasource::file_format::FileFormat;
+use datafusion_proto::logical_plan::AsLogicalPlan;
+use datafusion_proto::protobuf::FileType;
+use futures::TryStreamExt;
+use log::{debug, error, info, trace, warn};
+use object_store::local::LocalFileSystem;
+use object_store::path::Path;
+use object_store::ObjectStore;
+use rand::{distributions::Alphanumeric, thread_rng, Rng};
+use tonic::{Request, Response, Status};
+
use ballista_core::config::{BallistaConfig, TaskSchedulingPolicy};
use ballista_core::error::BallistaError;
use ballista_core::serde::protobuf::execute_query_params::{OptionalSessionId, Query};
@@ -29,30 +42,21 @@ use ballista_core::serde::protobuf::executor_registration::OptionalHost;
use ballista_core::serde::protobuf::scheduler_grpc_server::SchedulerGrpc;
use ballista_core::serde::protobuf::{
job_status, ExecuteQueryParams, ExecuteQueryResult, ExecutorHeartbeat, FailedJob,
- FileType, GetFileMetadataParams, GetFileMetadataResult, GetJobStatusParams,
- GetJobStatusResult, HeartBeatParams, HeartBeatResult, JobStatus, PollWorkParams,
- PollWorkResult, QueuedJob, RegisterExecutorParams, RegisterExecutorResult,
- UpdateTaskStatusParams, UpdateTaskStatusResult,
+ GetFileMetadataParams, GetFileMetadataResult, GetJobStatusParams, GetJobStatusResult,
+ HeartBeatParams, HeartBeatResult, JobStatus, PollWorkParams, PollWorkResult,
+ QueuedJob, RegisterExecutorParams, RegisterExecutorResult, UpdateTaskStatusParams,
+ UpdateTaskStatusResult,
};
use ballista_core::serde::scheduler::{
ExecutorData, ExecutorDataChange, ExecutorMetadata,
};
use ballista_core::serde::AsExecutionPlan;
-use datafusion::datafusion_data_access::object_store::{
- local::LocalFileSystem, ObjectStore,
+
+use crate::scheduler_server::event::QueryStageSchedulerEvent;
+use crate::scheduler_server::{
+ create_datafusion_context, update_datafusion_context, SchedulerServer,
};
-use datafusion::datasource::file_format::parquet::ParquetFormat;
-use datafusion::datasource::file_format::FileFormat;
-use datafusion_proto::logical_plan::AsLogicalPlan;
-use futures::TryStreamExt;
-use log::{debug, error, info, trace, warn};
-use rand::{distributions::Alphanumeric, thread_rng, Rng};
-use std::convert::TryInto;
-use std::ops::Deref;
-use std::sync::Arc;
-use std::time::Instant;
-use std::time::{SystemTime, UNIX_EPOCH};
-use tonic::{Request, Response, Status};
+use crate::state::task_scheduler::TaskScheduler;
#[tonic::async_trait]
impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
@@ -281,7 +285,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
request: Request<GetFileMetadataParams>,
) -> std::result::Result<Response<GetFileMetadataResult>, tonic::Status> {
// TODO support multiple object stores
- let obj_store = Arc::new(LocalFileSystem {}) as Arc<dyn ObjectStore>;
+ let obj_store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
// TODO shouldn't this take a ListingOption object as input?
let GetFileMetadataParams { path, file_type } = request.into_inner();
@@ -300,8 +304,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
)),
}?;
+ let path = Path::from(path.as_str());
let file_metas: Vec<_> = obj_store
- .list_file(&path)
+ .list(Some(&path))
.await
.map_err(|e| {
let msg = format!("Error listing files: {}", e);
@@ -309,7 +314,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
tonic::Status::internal(msg)
})?
.try_collect()
- .await?;
+ .await
+ .map_err(|e| {
+ let msg = format!("Error listing files: {}", e);
+ error!("{}", msg);
+ tonic::Status::internal(msg)
+ })?;
let schema = file_format
.infer_schema(&obj_store, &file_metas)
@@ -556,9 +566,10 @@ fn generate_job_id() -> String {
mod test {
use std::sync::Arc;
+ use datafusion::execution::context::default_session_builder;
+ use datafusion_proto::protobuf::LogicalPlanNode;
use tonic::Request;
- use crate::state::{backend::standalone::StandaloneClient, SchedulerState};
use ballista_core::error::BallistaError;
use ballista_core::serde::protobuf::{
executor_registration::OptionalHost, ExecutorRegistration, PhysicalPlanNode,
@@ -566,8 +577,8 @@ mod test {
};
use ballista_core::serde::scheduler::ExecutorSpecification;
use ballista_core::serde::BallistaCodec;
- use datafusion::execution::context::default_session_builder;
- use datafusion_proto::protobuf::LogicalPlanNode;
+
+ use crate::state::{backend::standalone::StandaloneClient, SchedulerState};
use super::{SchedulerGrpc, SchedulerServer};
diff --git a/ballista/rust/scheduler/src/scheduler_server/mod.rs b/ballista/rust/scheduler/src/scheduler_server/mod.rs
index 68dede14..10f26b1d 100644
--- a/ballista/rust/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/mod.rs
@@ -15,25 +15,28 @@
// specific language governing permissions and limitations
// under the License.
-use crate::scheduler_server::event::{QueryStageSchedulerEvent, SchedulerServerEvent};
-use crate::scheduler_server::event_loop::SchedulerServerEventAction;
-use crate::scheduler_server::query_stage_scheduler::QueryStageScheduler;
-use crate::state::backend::StateBackendClient;
-use crate::state::SchedulerState;
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::{SystemTime, UNIX_EPOCH};
+
+use datafusion::execution::context::{default_session_builder, SessionState};
+use datafusion::prelude::{SessionConfig, SessionContext};
+use datafusion_proto::logical_plan::AsLogicalPlan;
+use tokio::sync::RwLock;
+use tonic::transport::Channel;
+
use ballista_core::config::{BallistaConfig, TaskSchedulingPolicy};
use ballista_core::error::Result;
use ballista_core::event_loop::EventLoop;
use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient;
use ballista_core::serde::protobuf::TaskStatus;
use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
-use datafusion::execution::context::{default_session_builder, SessionState};
-use datafusion::prelude::{SessionConfig, SessionContext};
-use datafusion_proto::logical_plan::AsLogicalPlan;
-use std::collections::HashMap;
-use std::sync::Arc;
-use std::time::{SystemTime, UNIX_EPOCH};
-use tokio::sync::RwLock;
-use tonic::transport::Channel;
+
+use crate::scheduler_server::event::{QueryStageSchedulerEvent, SchedulerServerEvent};
+use crate::scheduler_server::event_loop::SchedulerServerEventAction;
+use crate::scheduler_server::query_stage_scheduler::QueryStageScheduler;
+use crate::state::backend::StateBackendClient;
+use crate::state::SchedulerState;
// include the generated protobuf source as a submodule
#[allow(clippy::all)]
@@ -221,12 +224,14 @@ pub fn update_datafusion_context(
) -> Arc<SessionContext> {
{
let mut mut_state = session_ctx.state.write();
- mut_state.config.target_partitions = config.default_shuffle_partitions();
- mut_state.config.batch_size = config.default_batch_size();
- mut_state.config.repartition_joins = config.repartition_joins();
- mut_state.config.repartition_aggregations = config.repartition_aggregations();
- mut_state.config.repartition_windows = config.repartition_windows();
- mut_state.config.parquet_pruning = config.parquet_pruning();
+ // TODO Currently we have to start from default session config due to the interface not support update
+ mut_state.config = SessionConfig::default()
+ .with_target_partitions(config.default_shuffle_partitions())
+ .with_batch_size(config.default_batch_size())
+ .with_repartition_joins(config.repartition_joins())
+ .with_repartition_aggregations(config.repartition_aggregations())
+ .with_repartition_windows(config.repartition_windows())
+ .with_parquet_pruning(config.parquet_pruning());
}
session_ctx
}
@@ -277,11 +282,19 @@ impl SessionContextRegistry {
sessions.remove(session_id)
}
}
+
#[cfg(all(test, feature = "sled"))]
mod test {
use std::sync::Arc;
use std::time::{Duration, Instant};
+ use datafusion::arrow::datatypes::{DataType, Field, Schema};
+ use datafusion::execution::context::default_session_builder;
+ use datafusion::logical_plan::{col, sum, LogicalPlan};
+ use datafusion::prelude::{SessionConfig, SessionContext};
+ use datafusion::test_util::scan_empty;
+ use datafusion_proto::protobuf::LogicalPlanNode;
+
use ballista_core::config::TaskSchedulingPolicy;
use ballista_core::error::{BallistaError, Result};
use ballista_core::execution_plans::ShuffleWriterExec;
@@ -290,12 +303,6 @@ mod test {
};
use ballista_core::serde::scheduler::ExecutorData;
use ballista_core::serde::BallistaCodec;
- use datafusion::arrow::datatypes::{DataType, Field, Schema};
- use datafusion::execution::context::default_session_builder;
- use datafusion::logical_plan::{col, sum, LogicalPlan};
- use datafusion::prelude::{SessionConfig, SessionContext};
- use datafusion::test_util::scan_empty;
- use datafusion_proto::protobuf::LogicalPlanNode;
use crate::scheduler_server::event::QueryStageSchedulerEvent;
use crate::scheduler_server::SchedulerServer;
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 9b12889b..d2797322 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -33,8 +33,8 @@ snmalloc = ["snmalloc-rs"]
[dependencies]
ballista = { path = "../ballista/rust/client" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
env_logger = "0.9"
futures = "0.3"
mimalloc = { version = "0.1", optional = true, default-features = false }
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index 5070aed2..c20194ac 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -35,7 +35,7 @@ required-features = ["ballista/standalone"]
[dependencies]
ballista = { path = "../ballista/rust/client", version = "0.7.0" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
futures = "0.3"
num_cpus = "1.13.0"
prost = "0.10"