You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/07/13 19:30:57 UTC

[arrow-ballista] branch master updated: Use latest DataFusion (#86)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new 56ec4dff Use latest DataFusion (#86)
56ec4dff is described below

commit 56ec4dff02ed1b4e801842a66bfe151eaf461fce
Author: yahoNanJing <90...@users.noreply.github.com>
AuthorDate: Thu Jul 14 03:30:52 2022 +0800

    Use latest DataFusion (#86)
    
    * Update datafusion dependency to commit d0d5564b8f689a01e542b8c1df829d74d0fab2b0
    
    * Fix inconsistency
    
    * Use latest DataFusion
    
    * Fix tomlfmt
    
    * Fix PR review
    
    Co-authored-by: yangzhong <ya...@ebay.com>
---
 Cargo.toml                                         |   2 +-
 ballista-cli/Cargo.toml                            |   4 +-
 ballista/rust/client/Cargo.toml                    |   4 +-
 ballista/rust/client/src/context.rs                |  14 +-
 ballista/rust/core/Cargo.toml                      |   7 +-
 ballista/rust/core/proto/ballista.proto            | 223 +--------
 ballista/rust/core/proto/datafusion.proto          | 516 ++++++++++++++++-----
 .../rust/core/src/serde/logical_plan/from_proto.rs |  52 ---
 ballista/rust/core/src/serde/logical_plan/mod.rs   | 424 -----------------
 ballista/rust/core/src/serde/mod.rs                |  52 +--
 .../core/src/serde/physical_plan/from_proto.rs     |  84 ++--
 ballista/rust/core/src/serde/physical_plan/mod.rs  |  53 ++-
 .../rust/core/src/serde/physical_plan/to_proto.rs  |  10 +-
 ballista/rust/executor/Cargo.toml                  |   8 +-
 ballista/rust/executor/src/execution_loop.rs       |   1 +
 ballista/rust/executor/src/executor_server.rs      |   1 +
 ballista/rust/scheduler/Cargo.toml                 |   5 +-
 .../rust/scheduler/src/scheduler_server/grpc.rs    |  69 +--
 .../rust/scheduler/src/scheduler_server/mod.rs     |  57 ++-
 benchmarks/Cargo.toml                              |   4 +-
 examples/Cargo.toml                                |   2 +-
 21 files changed, 594 insertions(+), 998 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 9b9e7932..fc4c25ca 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -28,7 +28,7 @@ exclude = ["ballista-cli", "python"]
 
 # cargo build --profile release-lto
 [profile.release-lto]
-inherits = "release"
 codegen-units = 1
+inherits = "release"
 lto = true
 
diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml
index db0ad6b7..11c2e772 100644
--- a/ballista-cli/Cargo.toml
+++ b/ballista-cli/Cargo.toml
@@ -31,8 +31,8 @@ readme = "README.md"
 [dependencies]
 ballista = { path = "../ballista/rust/client", version = "0.7.0" }
 clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" }
-datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
+datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
 dirs = "4.0.0"
 env_logger = "0.9"
 mimalloc = { version = "0.1", default-features = false }
diff --git a/ballista/rust/client/Cargo.toml b/ballista/rust/client/Cargo.toml
index f20ed712..9cf4eb33 100644
--- a/ballista/rust/client/Cargo.toml
+++ b/ballista/rust/client/Cargo.toml
@@ -31,8 +31,8 @@ rust-version = "1.59"
 ballista-core = { path = "../core", version = "0.7.0" }
 ballista-executor = { path = "../executor", version = "0.7.0", optional = true }
 ballista-scheduler = { path = "../scheduler", version = "0.7.0", optional = true }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
 futures = "0.3"
 log = "0.4"
 parking_lot = "0.12"
diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs
index 09ff9cee..03cd0799 100644
--- a/ballista/rust/client/src/context.rs
+++ b/ballista/rust/client/src/context.rs
@@ -440,6 +440,8 @@ impl BallistaContext {
 
 #[cfg(test)]
 mod tests {
+    #[cfg(feature = "standalone")]
+    use datafusion::datasource::listing::ListingTableUrl;
 
     #[tokio::test]
     #[cfg(feature = "standalone")]
@@ -591,10 +593,14 @@ mod tests {
                         target_partitions: x.target_partitions,
                     };
 
-                    let config =
-                        ListingTableConfig::new(listing_table.table_path().clone())
-                            .with_schema(Arc::new(Schema::new(vec![])))
-                            .with_listing_options(error_options);
+                    let table_paths = listing_table
+                        .table_paths()
+                        .iter()
+                        .map(|t| ListingTableUrl::parse(t).unwrap())
+                        .collect();
+                    let config = ListingTableConfig::new_with_multi_paths(table_paths)
+                        .with_schema(Arc::new(Schema::new(vec![])))
+                        .with_listing_options(error_options);
 
                     let error_table = ListingTable::try_new(config).unwrap();
 
diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml
index a3803ab5..32f1c47e 100644
--- a/ballista/rust/core/Cargo.toml
+++ b/ballista/rust/core/Cargo.toml
@@ -35,17 +35,18 @@ simd = ["datafusion/simd"]
 [dependencies]
 ahash = { version = "0.7", default-features = false }
 
-arrow-flight = { version = "16.0.0" }
+arrow-flight = { version = "18.0.0" }
 async-trait = "0.1.41"
 chrono = { version = "0.4", default-features = false }
 clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
 futures = "0.3"
 hashbrown = "0.12"
 
 libloading = "0.7.3"
 log = "0.4"
+object_store = "0.3.0"
 once_cell = "1.9.0"
 
 parking_lot = "0.12"
diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto
index 0af951cf..f899a603 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -21,7 +21,7 @@ syntax = "proto3";
 package ballista.protobuf;
 
 option java_multiple_files = true;
-option java_package = "org.ballistacompute.protobuf";
+option java_package = "org.apache.arrow.ballista.protobuf";
 option java_outer_classname = "BallistaProto";
 
 import "datafusion.proto";
@@ -30,43 +30,6 @@ import "datafusion.proto";
 // Ballista Logical Plan
 ///////////////////////////////////////////////////////////////////////////////////////////////////
 
-// LogicalPlan is a nested type
-message LogicalPlanNode {
-  oneof LogicalPlanType {
-    ListingTableScanNode listing_scan = 1;
-    ProjectionNode projection = 3;
-    SelectionNode selection = 4;
-    LimitNode limit = 5;
-    AggregateNode aggregate = 6;
-    JoinNode join = 7;
-    SortNode sort = 8;
-    RepartitionNode repartition = 9;
-    EmptyRelationNode empty_relation = 10;
-    CreateExternalTableNode create_external_table = 11;
-    ExplainNode explain = 12;
-    WindowNode window = 13;
-    AnalyzeNode analyze = 14;
-    CrossJoinNode cross_join = 15;
-    ValuesNode values = 16;
-    LogicalExtensionNode extension = 17;
-    CreateCatalogSchemaNode create_catalog_schema = 18;
-    UnionNode union = 19;
-    CreateCatalogNode create_catalog = 20;
-    SubqueryAliasNode subquery_alias = 21;
-    CreateViewNode create_view = 22;
-    OffsetNode offset = 23;
-  }
-}
-
-message LogicalExtensionNode {
-  bytes node = 1;
-  repeated LogicalPlanNode inputs = 2;
-}
-
-message ProjectionColumns {
-  repeated string columns = 1;
-}
-
 message Statistics {
   int64 num_rows = 1;
   int64 total_byte_size = 2;
@@ -87,186 +50,6 @@ message PartitionedFile {
   FileRange range = 5;
 }
 
-message CsvFormat {
-  bool has_header = 1;
-  string delimiter = 2;
-}
-
-message ParquetFormat {
-  bool enable_pruning = 1;
-}
-
-message AvroFormat {}
-
-message ListingTableScanNode {
-  string table_name = 1;
-  string path = 2;
-  string file_extension = 3;
-  ProjectionColumns projection = 4;
-  datafusion.Schema schema = 5;
-  repeated datafusion.LogicalExprNode filters = 6;
-  repeated string table_partition_cols = 7;
-  bool collect_stat = 8;
-  uint32 target_partitions = 9;
-  oneof FileFormatType {
-    CsvFormat csv = 10;
-    ParquetFormat parquet = 11;
-    AvroFormat avro = 12;
-  }
-}
-
-message ProjectionNode {
-  LogicalPlanNode input = 1;
-  repeated datafusion.LogicalExprNode expr = 2;
-  oneof optional_alias {
-    string alias = 3;
-  }
-}
-
-message SelectionNode {
-  LogicalPlanNode input = 1;
-  datafusion.LogicalExprNode expr = 2;
-}
-
-message SortNode {
-  LogicalPlanNode input = 1;
-  repeated datafusion.LogicalExprNode expr = 2;
-}
-
-message RepartitionNode {
-  LogicalPlanNode input = 1;
-  oneof partition_method {
-    uint64 round_robin = 2;
-    HashRepartition hash = 3;
-  }
-}
-
-message HashRepartition {
-  repeated datafusion.LogicalExprNode hash_expr = 1;
-  uint64 partition_count = 2;
-}
-
-message EmptyRelationNode {
-  bool produce_one_row = 1;
-}
-
-message CreateExternalTableNode {
-  string name = 1;
-  string location = 2;
-  FileType file_type = 3;
-  bool has_header = 4;
-  datafusion.DfSchema schema = 5;
-  repeated string table_partition_cols = 6;
-  bool if_not_exists = 7;
-  string delimiter = 8;
-}
-
-message CreateCatalogSchemaNode {
-  string schema_name = 1;
-  bool if_not_exists = 2;
-  datafusion.DfSchema schema = 3;
-}
-
-message CreateCatalogNode {
-  string catalog_name = 1;
-  bool if_not_exists = 2;
-  datafusion.DfSchema schema = 3;
-}
-
-message CreateViewNode {
-  string name = 1;
-  LogicalPlanNode input = 2;
-  bool or_replace = 3;
-}
-
-// a node containing data for defining values list. unlike in SQL where it's two dimensional, here
-// the list is flattened, and with the field n_cols it can be parsed and partitioned into rows
-message ValuesNode {
-  uint64 n_cols = 1;
-  repeated datafusion.LogicalExprNode values_list = 2;
-}
-
-enum FileType {
-  NdJson = 0;
-  Parquet = 1;
-  CSV = 2;
-  Avro = 3;
-}
-
-message AnalyzeNode {
-  LogicalPlanNode input = 1;
-  bool verbose = 2;
-}
-
-message ExplainNode {
-  LogicalPlanNode input = 1;
-  bool verbose = 2;
-}
-
-message AggregateNode {
-  LogicalPlanNode input = 1;
-  repeated datafusion.LogicalExprNode group_expr = 2;
-  repeated datafusion.LogicalExprNode aggr_expr = 3;
-}
-
-message WindowNode {
-  LogicalPlanNode input = 1;
-  repeated datafusion.LogicalExprNode window_expr = 2;
-}
-
-enum JoinType {
-  INNER = 0;
-  LEFT = 1;
-  RIGHT = 2;
-  FULL = 3;
-  SEMI = 4;
-  ANTI = 5;
-}
-
-enum JoinConstraint {
-  ON = 0;
-  USING = 1;
-}
-
-message JoinNode {
-  LogicalPlanNode left = 1;
-  LogicalPlanNode right = 2;
-  JoinType join_type = 3;
-  JoinConstraint join_constraint = 4;
-  repeated datafusion.Column left_join_column = 5;
-  repeated datafusion.Column right_join_column = 6;
-  bool null_equals_null = 7;
-  datafusion.LogicalExprNode filter = 8;
-}
-
-message UnionNode {
-  repeated LogicalPlanNode inputs = 1;
-}
-
-message CrossJoinNode {
-  LogicalPlanNode left = 1;
-  LogicalPlanNode right = 2;
-}
-
-message LimitNode {
-  LogicalPlanNode input = 1;
-  uint32 limit = 2;
-}
-
-message OffsetNode {
-  LogicalPlanNode input = 1;
-  uint32 offset = 2;
-}
-
-message SelectionExecNode {
-  datafusion.LogicalExprNode expr = 1;
-}
-
-message SubqueryAliasNode {
-  LogicalPlanNode input = 1;
-  string alias = 2;
-}
-
 ///////////////////////////////////////////////////////////////////////////////////////////////////
 // Ballista Physical Plan
 ///////////////////////////////////////////////////////////////////////////////////////////////////
@@ -480,7 +263,7 @@ message HashJoinExecNode {
   PhysicalPlanNode left = 1;
   PhysicalPlanNode right = 2;
   repeated JoinOn on = 3;
-  JoinType join_type = 4;
+  datafusion.JoinType join_type = 4;
   PartitionMode partition_mode = 6;
   bool null_equals_null = 7;
   JoinFilter filter = 8;
@@ -893,7 +676,7 @@ message GetJobStatusResult {
 
 message GetFileMetadataParams {
   string path = 1;
-  FileType file_type = 2;
+  datafusion.FileType file_type = 2;
 }
 
 message GetFileMetadataResult {
diff --git a/ballista/rust/core/proto/datafusion.proto b/ballista/rust/core/proto/datafusion.proto
index 9999abbf..09c97029 100644
--- a/ballista/rust/core/proto/datafusion.proto
+++ b/ballista/rust/core/proto/datafusion.proto
@@ -21,7 +21,7 @@ syntax = "proto3";
 package datafusion;
 
 option java_multiple_files = true;
-option java_package = "org.datafusioncompute.protobuf";
+option java_package = "org.apache.arrow.datafusion.protobuf";
 option java_outer_classname = "DatafusionProto";
 
 message ColumnRelation {
@@ -40,6 +40,228 @@ message DfField{
 
 message DfSchema {
   repeated DfField columns = 1;
+  map<string, string> metadata = 2;
+}
+
+// logical plan
+// LogicalPlan is a nested type
+message LogicalPlanNode {
+  oneof LogicalPlanType {
+    ListingTableScanNode listing_scan = 1;
+    ProjectionNode projection = 3;
+    SelectionNode selection = 4;
+    LimitNode limit = 5;
+    AggregateNode aggregate = 6;
+    JoinNode join = 7;
+    SortNode sort = 8;
+    RepartitionNode repartition = 9;
+    EmptyRelationNode empty_relation = 10;
+    CreateExternalTableNode create_external_table = 11;
+    ExplainNode explain = 12;
+    WindowNode window = 13;
+    AnalyzeNode analyze = 14;
+    CrossJoinNode cross_join = 15;
+    ValuesNode values = 16;
+    LogicalExtensionNode extension = 17;
+    CreateCatalogSchemaNode create_catalog_schema = 18;
+    UnionNode union = 19;
+    CreateCatalogNode create_catalog = 20;
+    SubqueryAliasNode subquery_alias = 21;
+    CreateViewNode create_view = 22;
+    DistinctNode distinct = 23;
+  }
+}
+
+message LogicalExtensionNode {
+  bytes node = 1;
+  repeated LogicalPlanNode inputs = 2;
+}
+
+message ProjectionColumns {
+  repeated string columns = 1;
+}
+
+message CsvFormat {
+  bool has_header = 1;
+  string delimiter = 2;
+}
+
+message ParquetFormat {
+  bool enable_pruning = 1;
+}
+
+message AvroFormat {}
+
+message ListingTableScanNode {
+  string table_name = 1;
+  repeated string paths = 2;
+  string file_extension = 3;
+  ProjectionColumns projection = 4;
+  datafusion.Schema schema = 5;
+  repeated datafusion.LogicalExprNode filters = 6;
+  repeated string table_partition_cols = 7;
+  bool collect_stat = 8;
+  uint32 target_partitions = 9;
+  oneof FileFormatType {
+    CsvFormat csv = 10;
+    ParquetFormat parquet = 11;
+    AvroFormat avro = 12;
+  }
+}
+
+message ProjectionNode {
+  LogicalPlanNode input = 1;
+  repeated datafusion.LogicalExprNode expr = 2;
+  oneof optional_alias {
+    string alias = 3;
+  }
+}
+
+message SelectionNode {
+  LogicalPlanNode input = 1;
+  datafusion.LogicalExprNode expr = 2;
+}
+
+message SortNode {
+  LogicalPlanNode input = 1;
+  repeated datafusion.LogicalExprNode expr = 2;
+}
+
+message RepartitionNode {
+  LogicalPlanNode input = 1;
+  oneof partition_method {
+    uint64 round_robin = 2;
+    HashRepartition hash = 3;
+  }
+}
+
+message HashRepartition {
+  repeated datafusion.LogicalExprNode hash_expr = 1;
+  uint64 partition_count = 2;
+}
+
+message EmptyRelationNode {
+  bool produce_one_row = 1;
+}
+
+message CreateExternalTableNode {
+  string name = 1;
+  string location = 2;
+  FileType file_type = 3;
+  bool has_header = 4;
+  datafusion.DfSchema schema = 5;
+  repeated string table_partition_cols = 6;
+  bool if_not_exists = 7;
+  string delimiter = 8;
+}
+
+message CreateCatalogSchemaNode {
+  string schema_name = 1;
+  bool if_not_exists = 2;
+  datafusion.DfSchema schema = 3;
+}
+
+message CreateCatalogNode {
+  string catalog_name = 1;
+  bool if_not_exists = 2;
+  datafusion.DfSchema schema = 3;
+}
+
+message CreateViewNode {
+  string name = 1;
+  LogicalPlanNode input = 2;
+  bool or_replace = 3;
+  string definition = 4;
+}
+
+// a node containing data for defining values list. unlike in SQL where it's two dimensional, here
+// the list is flattened, and with the field n_cols it can be parsed and partitioned into rows
+message ValuesNode {
+  uint64 n_cols = 1;
+  repeated datafusion.LogicalExprNode values_list = 2;
+}
+
+enum FileType {
+  NdJson = 0;
+  Parquet = 1;
+  CSV = 2;
+  Avro = 3;
+}
+
+message AnalyzeNode {
+  LogicalPlanNode input = 1;
+  bool verbose = 2;
+}
+
+message ExplainNode {
+  LogicalPlanNode input = 1;
+  bool verbose = 2;
+}
+
+message AggregateNode {
+  LogicalPlanNode input = 1;
+  repeated datafusion.LogicalExprNode group_expr = 2;
+  repeated datafusion.LogicalExprNode aggr_expr = 3;
+}
+
+message WindowNode {
+  LogicalPlanNode input = 1;
+  repeated datafusion.LogicalExprNode window_expr = 2;
+}
+
+enum JoinType {
+  INNER = 0;
+  LEFT = 1;
+  RIGHT = 2;
+  FULL = 3;
+  SEMI = 4;
+  ANTI = 5;
+}
+
+enum JoinConstraint {
+  ON = 0;
+  USING = 1;
+}
+
+message JoinNode {
+  LogicalPlanNode left = 1;
+  LogicalPlanNode right = 2;
+  JoinType join_type = 3;
+  JoinConstraint join_constraint = 4;
+  repeated datafusion.Column left_join_column = 5;
+  repeated datafusion.Column right_join_column = 6;
+  bool null_equals_null = 7;
+  LogicalExprNode filter = 8;
+}
+
+message DistinctNode {
+  LogicalPlanNode input = 1;
+}
+
+message UnionNode {
+  repeated LogicalPlanNode inputs = 1;
+}
+
+message CrossJoinNode {
+  LogicalPlanNode left = 1;
+  LogicalPlanNode right = 2;
+}
+
+message LimitNode {
+  LogicalPlanNode input = 1;
+  // The number of rows to skip before fetch; non-positive means don't skip any
+  int64 skip = 2;
+  // Maximum number of rows to fetch; negative means no limit
+  int64 fetch = 3;
+}
+
+message SelectionExecNode {
+  datafusion.LogicalExprNode expr = 1;
+}
+
+message SubqueryAliasNode {
+  LogicalPlanNode input = 1;
+  string alias = 2;
 }
 
 // logical expressions
@@ -76,9 +298,46 @@ message LogicalExprNode {
 
     // window expressions
     WindowExprNode window_expr = 18;
+
+    // AggregateUDF expressions
+    AggregateUDFExprNode aggregate_udf_expr = 19;
+
+    // Scalar UDF expressions
+    ScalarUDFExprNode scalar_udf_expr = 20;
+
+    GetIndexedField get_indexed_field = 21;
+
+    GroupingSetNode grouping_set = 22;
+
+    CubeNode cube = 23;
+
+    RollupNode rollup = 24;
   }
 }
 
+message LogicalExprList {
+  repeated LogicalExprNode expr = 1;
+}
+
+message GroupingSetNode {
+  repeated LogicalExprList expr = 1;
+}
+
+message CubeNode {
+  repeated LogicalExprNode expr = 1;
+}
+
+message RollupNode {
+  repeated LogicalExprNode expr = 1;
+}
+
+
+
+message GetIndexedField {
+  LogicalExprNode expr = 1;
+  ScalarValue key = 2;
+}
+
 message IsNull {
   LogicalExprNode expr = 1;
 }
@@ -177,6 +436,8 @@ enum ScalarFunction {
   Trim=61;
   Upper=62;
   Coalesce=63;
+  Power=64;
+  StructFun=65;
 }
 
 message ScalarFunctionNode {
@@ -202,6 +463,7 @@ enum AggregateFunction {
   APPROX_PERCENTILE_CONT = 14;
   APPROX_MEDIAN=15;
   APPROX_PERCENTILE_CONT_WITH_WEIGHT = 16;
+  GROUPING = 17;
 }
 
 message AggregateExprNode {
@@ -209,6 +471,16 @@ message AggregateExprNode {
   repeated LogicalExprNode expr = 2;
 }
 
+message AggregateUDFExprNode {
+  string fun_name = 1;
+  repeated LogicalExprNode args = 2;
+}
+
+message ScalarUDFExprNode {
+  string fun_name = 1;
+  repeated LogicalExprNode args = 2;
+}
+
 enum BuiltInWindowFunction {
   ROW_NUMBER = 0;
   RANK = 1;
@@ -321,53 +593,53 @@ message Field {
 }
 
 message FixedSizeBinary{
-    int32 length = 1;
+  int32 length = 1;
 }
 
 message Timestamp{
-    TimeUnit time_unit = 1;
-    string timezone = 2;
+  TimeUnit time_unit = 1;
+  string timezone = 2;
 }
 
 enum DateUnit{
-    Day = 0;
-    DateMillisecond = 1;
+  Day = 0;
+  DateMillisecond = 1;
 }
 
 enum TimeUnit{
-    Second = 0;
-    TimeMillisecond = 1;
-    Microsecond = 2;
-    Nanosecond = 3;
+  Second = 0;
+  TimeMillisecond = 1;
+  Microsecond = 2;
+  Nanosecond = 3;
 }
 
 enum IntervalUnit{
-    YearMonth = 0;
-    DayTime = 1;
-    MonthDayNano = 2;
+  YearMonth = 0;
+  DayTime = 1;
+  MonthDayNano = 2;
 }
 
 message Decimal{
-    uint64 whole = 1;
-    uint64 fractional = 2;
+  uint64 whole = 1;
+  uint64 fractional = 2;
 }
 
 message List{
-    Field field_type = 1;
+  Field field_type = 1;
 }
 
 message FixedSizeList{
-    Field field_type = 1;
-    int32 list_size = 2;
+  Field field_type = 1;
+  int32 list_size = 2;
 }
 
 message Dictionary{
-    ArrowType key = 1;
-    ArrowType value = 2;
+  ArrowType key = 1;
+  ArrowType value = 2;
 }
 
 message Struct{
-    repeated Field sub_field_types = 1;
+  repeated Field sub_field_types = 1;
 }
 
 enum UnionMode{
@@ -376,45 +648,53 @@ enum UnionMode{
 }
 
 message Union{
-    repeated Field union_types = 1;
-    UnionMode union_mode = 2;
+  repeated Field union_types = 1;
+  UnionMode union_mode = 2;
+  repeated int32 type_ids = 3;
 }
 
 message ScalarListValue{
-    ScalarType datatype = 1;
-    repeated ScalarValue values = 2;
+  ScalarType datatype = 1;
+  repeated ScalarValue values = 2;
+}
+
+message ScalarTimestampValue {
+  oneof value {
+    int64  time_microsecond_value = 1;
+    int64  time_nanosecond_value = 2;
+    int64 time_second_value = 3;
+    int64 time_millisecond_value = 4;
+  };
+  string timezone = 5;
 }
 
 message ScalarValue{
-    oneof value {
-        bool   bool_value = 1;
-        string utf8_value = 2;
-        string large_utf8_value = 3;
-        int32  int8_value = 4;
-        int32  int16_value = 5;
-        int32  int32_value = 6;
-        int64  int64_value = 7;
-        uint32 uint8_value = 8;
-        uint32 uint16_value = 9;
-        uint32 uint32_value = 10;
-        uint64 uint64_value = 11;
-        float  float32_value = 12;
-        double float64_value = 13;
-        //Literal Date32 value always has a unit of day
-        int32  date_32_value = 14;
-        int64  time_microsecond_value = 15;
-        int64  time_nanosecond_value = 16;
-        ScalarListValue list_value = 17;
-        ScalarType null_list_value = 18;
-
-        PrimitiveScalarType null_value = 19;
-        Decimal128 decimal128_value = 20;
-        int64 date_64_value = 21;
-        int64 time_second_value = 22;
-        int64 time_millisecond_value = 23;
-        int32 interval_yearmonth_value = 24;
-        int64 interval_daytime_value = 25;
-    }
+  oneof value {
+    bool   bool_value = 1;
+    string utf8_value = 2;
+    string large_utf8_value = 3;
+    int32  int8_value = 4;
+    int32  int16_value = 5;
+    int32  int32_value = 6;
+    int64  int64_value = 7;
+    uint32 uint8_value = 8;
+    uint32 uint16_value = 9;
+    uint32 uint32_value = 10;
+    uint64 uint64_value = 11;
+    float  float32_value = 12;
+    double float64_value = 13;
+    //Literal Date32 value always has a unit of day
+    int32  date_32_value = 14;
+    ScalarListValue list_value = 17;
+    ScalarType null_list_value = 18;
+
+    PrimitiveScalarType null_value = 19;
+    Decimal128 decimal128_value = 20;
+    int64 date_64_value = 21;
+    int32 interval_yearmonth_value = 24;
+    int64 interval_daytime_value = 25;
+    ScalarTimestampValue timestamp_value = 26;
+  }
 }
 
 message Decimal128{
@@ -427,42 +707,42 @@ message Decimal128{
 // List
 enum PrimitiveScalarType{
 
-    BOOL = 0;     // arrow::Type::BOOL
-    UINT8 = 1;    // arrow::Type::UINT8
-    INT8 = 2;     // arrow::Type::INT8
-    UINT16 = 3;   // represents arrow::Type fields in src/arrow/type.h
-    INT16 = 4;
-    UINT32 = 5;
-    INT32 = 6;
-    UINT64 = 7;
-    INT64 = 8;
-    FLOAT32 = 9;
-    FLOAT64 = 10;
-    UTF8 = 11;
-    LARGE_UTF8 = 12;
-    DATE32 = 13;
-    TIME_MICROSECOND = 14;
-    TIME_NANOSECOND = 15;
-    NULL = 16;
-
-    DECIMAL128 = 17;
-    DATE64 = 20;
-    TIME_SECOND = 21;
-    TIME_MILLISECOND = 22;
-    INTERVAL_YEARMONTH = 23;
-    INTERVAL_DAYTIME = 24;
+  BOOL = 0;     // arrow::Type::BOOL
+  UINT8 = 1;    // arrow::Type::UINT8
+  INT8 = 2;     // arrow::Type::INT8
+  UINT16 = 3;   // represents arrow::Type fields in src/arrow/type.h
+  INT16 = 4;
+  UINT32 = 5;
+  INT32 = 6;
+  UINT64 = 7;
+  INT64 = 8;
+  FLOAT32 = 9;
+  FLOAT64 = 10;
+  UTF8 = 11;
+  LARGE_UTF8 = 12;
+  DATE32 = 13;
+  TIME_MICROSECOND = 14;
+  TIME_NANOSECOND = 15;
+  NULL = 16;
+
+  DECIMAL128 = 17;
+  DATE64 = 20;
+  TIME_SECOND = 21;
+  TIME_MILLISECOND = 22;
+  INTERVAL_YEARMONTH = 23;
+  INTERVAL_DAYTIME = 24;
 }
 
 message ScalarType{
-    oneof datatype{
-        PrimitiveScalarType scalar = 1;
-        ScalarListType list = 2;
-    }
+  oneof datatype{
+    PrimitiveScalarType scalar = 1;
+    ScalarListType list = 2;
+  }
 }
 
 message ScalarListType{
-    repeated string field_names = 3;
-    PrimitiveScalarType deepest_type = 2;
+  repeated string field_names = 3;
+  PrimitiveScalarType deepest_type = 2;
 }
 
 // Broke out into multiple message types so that type
@@ -470,40 +750,40 @@ message ScalarListType{
 //All types that are of the empty message types contain no additional metadata
 // about the type
 message ArrowType{
-    oneof arrow_type_enum{
-        EmptyMessage NONE = 1;     // arrow::Type::NA
-        EmptyMessage BOOL =  2;     // arrow::Type::BOOL
-        EmptyMessage UINT8 = 3;    // arrow::Type::UINT8
-        EmptyMessage INT8 =  4;     // arrow::Type::INT8
-        EmptyMessage UINT16 =5;   // represents arrow::Type fields in src/arrow/type.h
-        EmptyMessage INT16 = 6;
-        EmptyMessage UINT32 =7;
-        EmptyMessage INT32 = 8;
-        EmptyMessage UINT64 =9;
-        EmptyMessage INT64 =10 ;
-        EmptyMessage FLOAT16 =11 ;
-        EmptyMessage FLOAT32 =12 ;
-        EmptyMessage FLOAT64 =13 ;
-        EmptyMessage UTF8 =14 ;
-        EmptyMessage LARGE_UTF8 = 32;
-        EmptyMessage BINARY =15 ;
-        int32 FIXED_SIZE_BINARY =16 ;
-        EmptyMessage LARGE_BINARY = 31;
-        EmptyMessage DATE32 =17 ;
-        EmptyMessage DATE64 =18 ;
-        TimeUnit DURATION = 19;
-        Timestamp TIMESTAMP =20 ;
-        TimeUnit TIME32 =21 ;
-        TimeUnit TIME64 =22 ;
-        IntervalUnit INTERVAL =23 ;
-        Decimal DECIMAL =24 ;
-        List LIST =25;
-        List LARGE_LIST = 26;
-        FixedSizeList FIXED_SIZE_LIST = 27;
-        Struct STRUCT =28;
-        Union UNION =29;
-        Dictionary DICTIONARY =30;
-    }
+  oneof arrow_type_enum{
+    EmptyMessage NONE = 1;     // arrow::Type::NA
+    EmptyMessage BOOL =  2;     // arrow::Type::BOOL
+    EmptyMessage UINT8 = 3;    // arrow::Type::UINT8
+    EmptyMessage INT8 =  4;     // arrow::Type::INT8
+    EmptyMessage UINT16 =5;   // represents arrow::Type fields in src/arrow/type.h
+    EmptyMessage INT16 = 6;
+    EmptyMessage UINT32 =7;
+    EmptyMessage INT32 = 8;
+    EmptyMessage UINT64 =9;
+    EmptyMessage INT64 =10 ;
+    EmptyMessage FLOAT16 =11 ;
+    EmptyMessage FLOAT32 =12 ;
+    EmptyMessage FLOAT64 =13 ;
+    EmptyMessage UTF8 =14 ;
+    EmptyMessage LARGE_UTF8 = 32;
+    EmptyMessage BINARY =15 ;
+    int32 FIXED_SIZE_BINARY =16 ;
+    EmptyMessage LARGE_BINARY = 31;
+    EmptyMessage DATE32 =17 ;
+    EmptyMessage DATE64 =18 ;
+    TimeUnit DURATION = 19;
+    Timestamp TIMESTAMP =20 ;
+    TimeUnit TIME32 =21 ;
+    TimeUnit TIME64 =22 ;
+    IntervalUnit INTERVAL =23 ;
+    Decimal DECIMAL =24 ;
+    List LIST =25;
+    List LARGE_LIST = 26;
+    FixedSizeList FIXED_SIZE_LIST = 27;
+    Struct STRUCT =28;
+    Union UNION =29;
+    Dictionary DICTIONARY =30;
+  }
 }
 
 //Useful for representing an empty enum variant in rust
diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
deleted file mode 100644
index a17c646c..00000000
--- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs
+++ /dev/null
@@ -1,52 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Serde code to convert from protocol buffers to Rust data structures.
-
-use crate::error::BallistaError;
-use crate::serde::protobuf;
-use std::convert::TryFrom;
-
-impl TryFrom<i32> for protobuf::FileType {
-    type Error = BallistaError;
-    fn try_from(value: i32) -> Result<Self, Self::Error> {
-        use protobuf::FileType;
-        match value {
-            _x if _x == FileType::NdJson as i32 => Ok(FileType::NdJson),
-            _x if _x == FileType::Parquet as i32 => Ok(FileType::Parquet),
-            _x if _x == FileType::Csv as i32 => Ok(FileType::Csv),
-            _x if _x == FileType::Avro as i32 => Ok(FileType::Avro),
-            invalid => Err(BallistaError::General(format!(
-                "Attempted to convert invalid i32 to protobuf::Filetype: {}",
-                invalid
-            ))),
-        }
-    }
-}
-
-#[allow(clippy::from_over_into)]
-impl Into<datafusion::logical_plan::FileType> for protobuf::FileType {
-    fn into(self) -> datafusion::logical_plan::FileType {
-        use datafusion::logical_plan::FileType;
-        match self {
-            protobuf::FileType::NdJson => FileType::NdJson,
-            protobuf::FileType::Parquet => FileType::Parquet,
-            protobuf::FileType::Csv => FileType::CSV,
-            protobuf::FileType::Avro => FileType::Avro,
-        }
-    }
-}
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs
deleted file mode 100644
index 5952a095..00000000
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ /dev/null
@@ -1,424 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-pub mod from_proto;
-
-#[macro_export]
-macro_rules! into_logical_plan {
-    ($PB:expr, $CTX:expr, $CODEC:expr) => {{
-        if let Some(field) = $PB.as_ref() {
-            field.as_ref().try_into_logical_plan($CTX, $CODEC)
-        } else {
-            Err(proto_error("Missing required field in protobuf"))
-        }
-    }};
-}
-
-#[cfg(test)]
-mod roundtrip_tests {
-
-    use super::super::{super::error::Result, protobuf};
-    use crate::serde::{AsLogicalPlan, BallistaCodec};
-    use async_trait::async_trait;
-    use core::panic;
-    use datafusion::common::DFSchemaRef;
-    use datafusion::datasource::listing::ListingTableUrl;
-    use datafusion::logical_plan::source_as_provider;
-    use datafusion::{
-        arrow::datatypes::{DataType, Field, Schema},
-        common::ToDFSchema,
-        datafusion_data_access::{
-            self,
-            object_store::{FileMetaStream, ListEntryStream, ObjectReader, ObjectStore},
-            SizedFile,
-        },
-        datasource::listing::ListingTable,
-        logical_expr::{
-            binary_expr, col,
-            logical_plan::{
-                CreateExternalTable, FileType, LogicalPlan, LogicalPlanBuilder,
-                Repartition,
-            },
-            Expr, Operator,
-        },
-        prelude::*,
-    };
-    use std::io;
-    use std::sync::Arc;
-
-    #[derive(Debug)]
-    struct TestObjectStore {}
-
-    #[async_trait]
-    impl ObjectStore for TestObjectStore {
-        async fn list_file(
-            &self,
-            _prefix: &str,
-        ) -> datafusion_data_access::Result<FileMetaStream> {
-            Err(io::Error::new(
-                io::ErrorKind::Unsupported,
-                "this is only a test object store".to_string(),
-            ))
-        }
-
-        async fn list_dir(
-            &self,
-            _prefix: &str,
-            _delimiter: Option<String>,
-        ) -> datafusion_data_access::Result<ListEntryStream> {
-            Err(io::Error::new(
-                io::ErrorKind::Unsupported,
-                "this is only a test object store".to_string(),
-            ))
-        }
-
-        fn file_reader(
-            &self,
-            _file: SizedFile,
-        ) -> datafusion_data_access::Result<Arc<dyn ObjectReader>> {
-            Err(io::Error::new(
-                io::ErrorKind::Unsupported,
-                "this is only a test object store".to_string(),
-            ))
-        }
-    }
-
-    // Given a identity of a LogicalPlan converts it to protobuf and back, using debug formatting to test equality.
-    macro_rules! roundtrip_test {
-        ($initial_struct:ident, $proto_type:ty, $struct_type:ty) => {
-            let proto: $proto_type = (&$initial_struct).try_into()?;
-
-            let round_trip: $struct_type = (&proto).try_into()?;
-
-            assert_eq!(
-                format!("{:?}", $initial_struct),
-                format!("{:?}", round_trip)
-            );
-        };
-        ($initial_struct:ident, $struct_type:ty) => {
-            roundtrip_test!($initial_struct, protobuf::LogicalPlanNode, $struct_type);
-        };
-        ($initial_struct:ident) => {
-            let ctx = SessionContext::new();
-            let codec: BallistaCodec<
-                datafusion_proto::protobuf::LogicalPlanNode,
-                protobuf::PhysicalPlanNode,
-            > = BallistaCodec::default();
-            let proto: datafusion_proto::protobuf::LogicalPlanNode =
-                datafusion_proto::protobuf::LogicalPlanNode::try_from_logical_plan(
-                    &$initial_struct,
-                    codec.logical_extension_codec(),
-                )
-                .expect("from logical plan");
-            let round_trip: LogicalPlan = proto
-                .try_into_logical_plan(&ctx, codec.logical_extension_codec())
-                .expect("to logical plan");
-
-            assert_eq!(
-                format!("{:?}", $initial_struct),
-                format!("{:?}", round_trip)
-            );
-        };
-        ($initial_struct:ident, $ctx:ident) => {
-            let codec: BallistaCodec<
-                protobuf::LogicalPlanNode,
-                protobuf::PhysicalPlanNode,
-            > = BallistaCodec::default();
-            let proto: datafusion_proto::protobuf::LogicalPlanNode =
-                protobuf::LogicalPlanNode::try_from_logical_plan(&$initial_struct)
-                    .expect("from logical plan");
-            let round_trip: LogicalPlan = proto
-                .try_into_logical_plan(&$ctx, codec.logical_extension_codec())
-                .expect("to logical plan");
-
-            assert_eq!(
-                format!("{:?}", $initial_struct),
-                format!("{:?}", round_trip)
-            );
-        };
-    }
-
-    #[tokio::test]
-    async fn roundtrip_repartition() -> Result<()> {
-        use datafusion::logical_plan::Partitioning;
-
-        let test_partition_counts = [usize::MIN, usize::MAX, 43256];
-
-        let test_expr: Vec<Expr> =
-            vec![col("c1") + col("c2"), Expr::Literal((4.0).into())];
-
-        let plan = std::sync::Arc::new(
-            test_scan_csv("employee", Some(vec![3, 4]))
-                .await?
-                .sort(vec![col("salary")])?
-                .build()?,
-        );
-
-        for partition_count in test_partition_counts.iter() {
-            let rr_repartition = Partitioning::RoundRobinBatch(*partition_count);
-
-            let roundtrip_plan = LogicalPlan::Repartition(Repartition {
-                input: plan.clone(),
-                partitioning_scheme: rr_repartition,
-            });
-
-            roundtrip_test!(roundtrip_plan);
-
-            let h_repartition = Partitioning::Hash(test_expr.clone(), *partition_count);
-
-            let roundtrip_plan = LogicalPlan::Repartition(Repartition {
-                input: plan.clone(),
-                partitioning_scheme: h_repartition,
-            });
-
-            roundtrip_test!(roundtrip_plan);
-
-            let no_expr_hrepartition = Partitioning::Hash(Vec::new(), *partition_count);
-
-            let roundtrip_plan = LogicalPlan::Repartition(Repartition {
-                input: plan.clone(),
-                partitioning_scheme: no_expr_hrepartition,
-            });
-
-            roundtrip_test!(roundtrip_plan);
-        }
-
-        Ok(())
-    }
-
-    #[test]
-    fn roundtrip_create_external_table() -> Result<()> {
-        let schema = test_schema();
-
-        let df_schema_ref = schema.to_dfschema_ref()?;
-
-        let filetypes: [FileType; 4] = [
-            FileType::NdJson,
-            FileType::Parquet,
-            FileType::CSV,
-            FileType::Avro,
-        ];
-
-        for file in filetypes.iter() {
-            let create_table_node =
-                LogicalPlan::CreateExternalTable(CreateExternalTable {
-                    schema: df_schema_ref.clone(),
-                    name: String::from("TestName"),
-                    location: String::from("employee.csv"),
-                    file_type: *file,
-                    has_header: true,
-                    delimiter: ',',
-                    table_partition_cols: vec![],
-                    if_not_exists: false,
-                });
-
-            roundtrip_test!(create_table_node);
-        }
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn roundtrip_analyze() -> Result<()> {
-        let verbose_plan = test_scan_csv("employee", Some(vec![3, 4]))
-            .await?
-            .sort(vec![col("salary")])?
-            .explain(true, true)?
-            .build()?;
-
-        let plan = test_scan_csv("employee", Some(vec![3, 4]))
-            .await?
-            .sort(vec![col("salary")])?
-            .explain(false, true)?
-            .build()?;
-
-        roundtrip_test!(plan);
-
-        roundtrip_test!(verbose_plan);
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn roundtrip_explain() -> Result<()> {
-        let verbose_plan = test_scan_csv("employee", Some(vec![3, 4]))
-            .await?
-            .sort(vec![col("salary")])?
-            .explain(true, false)?
-            .build()?;
-
-        let plan = test_scan_csv("employee", Some(vec![3, 4]))
-            .await?
-            .sort(vec![col("salary")])?
-            .explain(false, false)?
-            .build()?;
-
-        roundtrip_test!(plan);
-
-        roundtrip_test!(verbose_plan);
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn roundtrip_join() -> Result<()> {
-        let scan_plan = test_scan_csv("employee1", Some(vec![0, 3, 4]))
-            .await?
-            .build()?;
-        let filter = binary_expr(col("employee1.x"), Operator::Gt, col("employee2.y"));
-
-        let plan = test_scan_csv("employee2", Some(vec![0, 3, 4]))
-            .await?
-            .join(
-                &scan_plan,
-                JoinType::Inner,
-                (vec!["id"], vec!["id"]),
-                Some(filter),
-            )?
-            .build()?;
-
-        roundtrip_test!(plan);
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn roundtrip_sort() -> Result<()> {
-        let plan = test_scan_csv("employee", Some(vec![3, 4]))
-            .await?
-            .sort(vec![col("salary")])?
-            .build()?;
-        roundtrip_test!(plan);
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn roundtrip_empty_relation() -> Result<()> {
-        let plan_false = LogicalPlanBuilder::empty(false).build()?;
-
-        roundtrip_test!(plan_false);
-
-        let plan_true = LogicalPlanBuilder::empty(true).build()?;
-
-        roundtrip_test!(plan_true);
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn roundtrip_logical_plan() -> Result<()> {
-        let plan = test_scan_csv("employee", Some(vec![3, 4]))
-            .await?
-            .aggregate(vec![col("state")], vec![max(col("salary"))])?
-            .build()?;
-
-        roundtrip_test!(plan);
-
-        Ok(())
-    }
-
-    #[ignore] // see https://github.com/apache/arrow-datafusion/issues/2546
-    #[tokio::test]
-    async fn roundtrip_logical_plan_custom_ctx() -> Result<()> {
-        let ctx = SessionContext::new();
-        let codec: BallistaCodec<
-            datafusion_proto::protobuf::LogicalPlanNode,
-            protobuf::PhysicalPlanNode,
-        > = BallistaCodec::default();
-        let custom_object_store = Arc::new(TestObjectStore {});
-        ctx.runtime_env()
-            .register_object_store("test", custom_object_store.clone());
-
-        let table_path = "test:///employee.csv";
-        let url = ListingTableUrl::parse(table_path).unwrap();
-        let os = ctx.runtime_env().object_store(&url)?;
-        assert_eq!("TestObjectStore", &format!("{:?}", os));
-        assert_eq!(table_path, &url.to_string());
-
-        let schema = test_schema();
-        let plan = ctx
-            .read_csv(
-                table_path,
-                CsvReadOptions::new().schema(&schema).has_header(true),
-            )
-            .await?
-            .to_logical_plan()?;
-
-        let proto: datafusion_proto::protobuf::LogicalPlanNode =
-            datafusion_proto::protobuf::LogicalPlanNode::try_from_logical_plan(
-                &plan,
-                codec.logical_extension_codec(),
-            )
-            .expect("from logical plan");
-        let round_trip: LogicalPlan = proto
-            .try_into_logical_plan(&ctx, codec.logical_extension_codec())
-            .expect("to logical plan");
-
-        assert_eq!(format!("{:?}", plan), format!("{:?}", round_trip));
-
-        let table_path = match round_trip {
-            LogicalPlan::TableScan(scan) => {
-                let source = source_as_provider(&scan.source)?;
-                match source.as_ref().as_any().downcast_ref::<ListingTable>() {
-                    Some(listing_table) => listing_table.table_path().clone(),
-                    _ => panic!("expected a ListingTable"),
-                }
-            }
-            _ => panic!("expected a TableScan"),
-        };
-
-        assert_eq!(table_path.as_str(), url.as_str());
-
-        Ok(())
-    }
-
-    fn test_schema() -> Schema {
-        Schema::new(vec![
-            Field::new("id", DataType::Int32, false),
-            Field::new("first_name", DataType::Utf8, false),
-            Field::new("last_name", DataType::Utf8, false),
-            Field::new("state", DataType::Utf8, false),
-            Field::new("salary", DataType::Int32, false),
-        ])
-    }
-
-    async fn test_scan_csv(
-        table_name: &str,
-        projection: Option<Vec<usize>>,
-    ) -> Result<LogicalPlanBuilder> {
-        let schema = test_schema();
-        let ctx = SessionContext::new();
-        let options = CsvReadOptions::new().schema(&schema);
-
-        let uri = format!("file:///{}.csv", table_name);
-        ctx.register_csv(table_name, &uri, options).await?;
-
-        let df = ctx.table(table_name)?;
-        let plan = match df.to_logical_plan()? {
-            LogicalPlan::TableScan(ref scan) => {
-                let mut scan = scan.clone();
-                scan.projection = projection;
-                let mut projected_schema = scan.projected_schema.as_ref().clone();
-                projected_schema = projected_schema.replace_qualifier(table_name);
-                scan.projected_schema = DFSchemaRef::new(projected_schema);
-                LogicalPlan::TableScan(scan)
-            }
-            _ => unimplemented!(),
-        };
-        Ok(LogicalPlanBuilder::from(plan))
-    }
-}
diff --git a/ballista/rust/core/src/serde/mod.rs b/ballista/rust/core/src/serde/mod.rs
index a955c099..20979f12 100644
--- a/ballista/rust/core/src/serde/mod.rs
+++ b/ballista/rust/core/src/serde/mod.rs
@@ -20,7 +20,7 @@
 
 use crate::{error::BallistaError, serde::scheduler::Action as BallistaAction};
 use datafusion::execution::runtime_env::RuntimeEnv;
-use datafusion::logical_plan::{FunctionRegistry, JoinConstraint, JoinType, Operator};
+use datafusion::logical_plan::{FunctionRegistry, Operator};
 use datafusion::physical_plan::join_utils::JoinSide;
 use datafusion::physical_plan::ExecutionPlan;
 use datafusion_proto::logical_plan::{
@@ -39,7 +39,6 @@ pub mod protobuf {
     include!(concat!(env!("OUT_DIR"), "/ballista.protobuf.rs"));
 }
 
-pub mod logical_plan;
 pub mod physical_plan;
 pub mod scheduler;
 
@@ -221,32 +220,6 @@ pub(crate) fn from_proto_binary_op(op: &str) -> Result<Operator, BallistaError>
     }
 }
 
-impl From<protobuf::JoinType> for JoinType {
-    fn from(t: protobuf::JoinType) -> Self {
-        match t {
-            protobuf::JoinType::Inner => JoinType::Inner,
-            protobuf::JoinType::Left => JoinType::Left,
-            protobuf::JoinType::Right => JoinType::Right,
-            protobuf::JoinType::Full => JoinType::Full,
-            protobuf::JoinType::Semi => JoinType::Semi,
-            protobuf::JoinType::Anti => JoinType::Anti,
-        }
-    }
-}
-
-impl From<JoinType> for protobuf::JoinType {
-    fn from(t: JoinType) -> Self {
-        match t {
-            JoinType::Inner => protobuf::JoinType::Inner,
-            JoinType::Left => protobuf::JoinType::Left,
-            JoinType::Right => protobuf::JoinType::Right,
-            JoinType::Full => protobuf::JoinType::Full,
-            JoinType::Semi => protobuf::JoinType::Semi,
-            JoinType::Anti => protobuf::JoinType::Anti,
-        }
-    }
-}
-
 impl From<protobuf::JoinSide> for JoinSide {
     fn from(t: protobuf::JoinSide) -> Self {
         match t {
@@ -265,24 +238,6 @@ impl From<JoinSide> for protobuf::JoinSide {
     }
 }
 
-impl From<protobuf::JoinConstraint> for JoinConstraint {
-    fn from(t: protobuf::JoinConstraint) -> Self {
-        match t {
-            protobuf::JoinConstraint::On => JoinConstraint::On,
-            protobuf::JoinConstraint::Using => JoinConstraint::Using,
-        }
-    }
-}
-
-impl From<JoinConstraint> for protobuf::JoinConstraint {
-    fn from(t: JoinConstraint) -> Self {
-        match t {
-            JoinConstraint::On => protobuf::JoinConstraint::On,
-            JoinConstraint::Using => protobuf::JoinConstraint::Using,
-        }
-    }
-}
-
 fn byte_to_string(b: u8) -> Result<String, BallistaError> {
     let b = &[b];
     let b = std::str::from_utf8(b)
@@ -397,7 +352,7 @@ mod tests {
             &self,
             exprs: &[Expr],
             inputs: &[LogicalPlan],
-        ) -> Arc<dyn UserDefinedLogicalNode + Send + Sync> {
+        ) -> Arc<dyn UserDefinedLogicalNode> {
             assert_eq!(inputs.len(), 1, "input size inconsistent");
             assert_eq!(exprs.len(), 1, "expression size inconsistent");
             Arc::new(TopKPlanNode {
@@ -494,9 +449,10 @@ mod tests {
 
     struct TopKPlanner {}
 
+    #[async_trait]
     impl ExtensionPlanner for TopKPlanner {
         /// Create a physical plan for an extension node
-        fn plan_extension(
+        async fn plan_extension(
             &self,
             _planner: &dyn PhysicalPlanner,
             node: &dyn UserDefinedLogicalNode,
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index d699a011..6268ab28 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -21,13 +21,8 @@ use std::convert::{TryFrom, TryInto};
 use std::ops::Deref;
 use std::sync::Arc;
 
-use crate::error::BallistaError;
-
-use crate::convert_required;
-use crate::serde::{from_proto_binary_op, proto_error, protobuf};
 use chrono::{TimeZone, Utc};
-
-use datafusion::datafusion_data_access::{FileMeta, SizedFile};
+use datafusion::arrow::datatypes::Schema;
 use datafusion::datasource::listing::{FileRange, PartitionedFile};
 use datafusion::datasource::object_store::ObjectStoreUrl;
 use datafusion::execution::context::ExecutionProps;
@@ -43,9 +38,15 @@ use datafusion::physical_plan::{
     functions, Partitioning,
 };
 use datafusion::physical_plan::{ColumnStatistics, PhysicalExpr, Statistics};
+use object_store::path::Path;
+use object_store::ObjectMeta;
 
 use protobuf::physical_expr_node::ExprType;
 
+use crate::convert_required;
+use crate::error::BallistaError;
+use crate::serde::{from_proto_binary_op, proto_error, protobuf};
+
 impl From<&protobuf::PhysicalColumn> for Column {
     fn from(c: &protobuf::PhysicalColumn) -> Column {
         Column::new(&c.name, c.index as usize)
@@ -55,6 +56,7 @@ impl From<&protobuf::PhysicalColumn> for Column {
 pub(crate) fn parse_physical_expr(
     proto: &protobuf::PhysicalExprNode,
     registry: &dyn FunctionRegistry,
+    input_schema: &Schema,
 ) -> Result<Arc<dyn PhysicalExpr>, BallistaError> {
     let expr_type = proto
         .expr_type
@@ -70,9 +72,19 @@ pub(crate) fn parse_physical_expr(
             Arc::new(Literal::new(convert_required!(scalar.value)?))
         }
         ExprType::BinaryExpr(binary_expr) => Arc::new(BinaryExpr::new(
-            parse_required_physical_box_expr(&binary_expr.l, registry, "left")?,
+            parse_required_physical_box_expr(
+                &binary_expr.l,
+                registry,
+                "left",
+                input_schema,
+            )?,
             from_proto_binary_op(&binary_expr.op)?,
-            parse_required_physical_box_expr(&binary_expr.r, registry, "right")?,
+            parse_required_physical_box_expr(
+                &binary_expr.r,
+                registry,
+                "right",
+                input_schema,
+            )?,
         )),
         ExprType::AggregateExpr(_) => {
             return Err(BallistaError::General(
@@ -90,29 +102,33 @@ pub(crate) fn parse_physical_expr(
             ));
         }
         ExprType::IsNullExpr(e) => Arc::new(IsNullExpr::new(
-            parse_required_physical_box_expr(&e.expr, registry, "expr")?,
+            parse_required_physical_box_expr(&e.expr, registry, "expr", input_schema)?,
         )),
         ExprType::IsNotNullExpr(e) => Arc::new(IsNotNullExpr::new(
-            parse_required_physical_box_expr(&e.expr, registry, "expr")?,
+            parse_required_physical_box_expr(&e.expr, registry, "expr", input_schema)?,
         )),
         ExprType::NotExpr(e) => Arc::new(NotExpr::new(parse_required_physical_box_expr(
-            &e.expr, registry, "expr",
+            &e.expr,
+            registry,
+            "expr",
+            input_schema,
         )?)),
         ExprType::Negative(e) => Arc::new(NegativeExpr::new(
-            parse_required_physical_box_expr(&e.expr, registry, "expr")?,
+            parse_required_physical_box_expr(&e.expr, registry, "expr", input_schema)?,
         )),
         ExprType::InList(e) => Arc::new(InListExpr::new(
-            parse_required_physical_box_expr(&e.expr, registry, "expr")?,
+            parse_required_physical_box_expr(&e.expr, registry, "expr", input_schema)?,
             e.list
                 .iter()
-                .map(|x| parse_physical_expr(x, registry))
+                .map(|x| parse_physical_expr(x, registry, input_schema))
                 .collect::<Result<Vec<_>, _>>()?,
             e.negated,
+            input_schema,
         )),
         ExprType::Case(e) => Arc::new(CaseExpr::try_new(
             e.expr
                 .as_ref()
-                .map(|e| parse_physical_expr(e.as_ref(), registry))
+                .map(|e| parse_physical_expr(e.as_ref(), registry, input_schema))
                 .transpose()?,
             e.when_then_expr
                 .iter()
@@ -122,28 +138,29 @@ pub(crate) fn parse_physical_expr(
                             &e.when_expr,
                             registry,
                             "when_expr",
+                            input_schema,
                         )?,
                         parse_required_physical_expr(
                             &e.then_expr,
                             registry,
                             "then_expr",
+                            input_schema,
                         )?,
                     ))
                 })
-                .collect::<Result<Vec<_>, BallistaError>>()?
-                .as_slice(),
+                .collect::<Result<Vec<_>, BallistaError>>()?,
             e.else_expr
                 .as_ref()
-                .map(|e| parse_physical_expr(e.as_ref(), registry))
+                .map(|e| parse_physical_expr(e.as_ref(), registry, input_schema))
                 .transpose()?,
         )?),
         ExprType::Cast(e) => Arc::new(CastExpr::new(
-            parse_required_physical_box_expr(&e.expr, registry, "expr")?,
+            parse_required_physical_box_expr(&e.expr, registry, "expr", input_schema)?,
             convert_required!(e.arrow_type)?,
             DEFAULT_DATAFUSION_CAST_OPTIONS,
         )),
         ExprType::TryCast(e) => Arc::new(TryCastExpr::new(
-            parse_required_physical_box_expr(&e.expr, registry, "expr")?,
+            parse_required_physical_box_expr(&e.expr, registry, "expr", input_schema)?,
             convert_required!(e.arrow_type)?,
         )),
         ExprType::ScalarFunction(e) => {
@@ -157,7 +174,7 @@ pub(crate) fn parse_physical_expr(
             let args = e
                 .args
                 .iter()
-                .map(|x| parse_physical_expr(x, registry))
+                .map(|x| parse_physical_expr(x, registry, input_schema))
                 .collect::<Result<Vec<_>, _>>()?;
 
             // TODO Do not create new the ExecutionProps
@@ -181,7 +198,7 @@ pub(crate) fn parse_physical_expr(
             let args = e
                 .args
                 .iter()
-                .map(|x| parse_physical_expr(x, registry))
+                .map(|x| parse_physical_expr(x, registry, input_schema))
                 .collect::<Result<Vec<_>, _>>()?;
 
             Arc::new(ScalarFunctionExpr::new(
@@ -200,9 +217,10 @@ fn parse_required_physical_box_expr(
     expr: &Option<Box<protobuf::PhysicalExprNode>>,
     registry: &dyn FunctionRegistry,
     field: &str,
+    input_schema: &Schema,
 ) -> Result<Arc<dyn PhysicalExpr>, BallistaError> {
     expr.as_ref()
-        .map(|e| parse_physical_expr(e.as_ref(), registry))
+        .map(|e| parse_physical_expr(e.as_ref(), registry, input_schema))
         .transpose()?
         .ok_or_else(|| {
             BallistaError::General(format!("Missing required field {:?}", field))
@@ -213,9 +231,10 @@ fn parse_required_physical_expr(
     expr: &Option<protobuf::PhysicalExprNode>,
     registry: &dyn FunctionRegistry,
     field: &str,
+    input_schema: &Schema,
 ) -> Result<Arc<dyn PhysicalExpr>, BallistaError> {
     expr.as_ref()
-        .map(|e| parse_physical_expr(e, registry))
+        .map(|e| parse_physical_expr(e, registry, input_schema))
         .transpose()?
         .ok_or_else(|| {
             BallistaError::General(format!("Missing required field {:?}", field))
@@ -258,13 +277,14 @@ impl TryFrom<&protobuf::physical_window_expr_node::WindowFunction> for WindowFun
 pub fn parse_protobuf_hash_partitioning(
     partitioning: Option<&protobuf::PhysicalHashRepartition>,
     registry: &dyn FunctionRegistry,
+    input_schema: &Schema,
 ) -> Result<Option<Partitioning>, BallistaError> {
     match partitioning {
         Some(hash_part) => {
             let expr = hash_part
                 .hash_expr
                 .iter()
-                .map(|e| parse_physical_expr(e, registry))
+                .map(|e| parse_physical_expr(e, registry, input_schema))
                 .collect::<Result<Vec<Arc<dyn PhysicalExpr>>, _>>()?;
 
             Ok(Some(Partitioning::Hash(
@@ -281,16 +301,10 @@ impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile {
 
     fn try_from(val: &protobuf::PartitionedFile) -> Result<Self, Self::Error> {
         Ok(PartitionedFile {
-            file_meta: FileMeta {
-                sized_file: SizedFile {
-                    path: val.path.clone(),
-                    size: val.size,
-                },
-                last_modified: if val.last_modified_ns == 0 {
-                    None
-                } else {
-                    Some(Utc.timestamp_nanos(val.last_modified_ns as i64))
-                },
+            object_meta: ObjectMeta {
+                location: Path::from(val.path.as_str()),
+                last_modified: Utc.timestamp_nanos(val.last_modified_ns as i64),
+                size: val.size as usize,
             },
             partition_values: val
                 .partition_values
diff --git a/ballista/rust/core/src/serde/physical_plan/mod.rs b/ballista/rust/core/src/serde/physical_plan/mod.rs
index ac3cb22e..b80e64b1 100644
--- a/ballista/rust/core/src/serde/physical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/physical_plan/mod.rs
@@ -127,7 +127,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
                     .expr
                     .iter()
                     .zip(projection.expr_name.iter())
-                    .map(|(expr, name)| Ok((parse_physical_expr(expr,registry)?, name.to_string())))
+                    .map(|(expr, name)| Ok((parse_physical_expr(expr,registry, input.schema().as_ref())?, name.to_string())))
                     .collect::<Result<Vec<(Arc<dyn PhysicalExpr>, String)>, BallistaError>>(
                     )?;
                 Ok(Arc::new(ProjectionExec::try_new(exprs, input)?))
@@ -142,7 +142,9 @@ impl AsExecutionPlan for PhysicalPlanNode {
                 let predicate = filter
                     .expr
                     .as_ref()
-                    .map(|expr| parse_physical_expr(expr, registry))
+                    .map(|expr| {
+                        parse_physical_expr(expr, registry, input.schema().as_ref())
+                    })
                     .transpose()?
                     .ok_or_else(|| {
                         BallistaError::General(
@@ -200,7 +202,9 @@ impl AsExecutionPlan for PhysicalPlanNode {
                         let expr = hash_part
                             .hash_expr
                             .iter()
-                            .map(|e| parse_physical_expr(e, registry))
+                            .map(|e| {
+                                parse_physical_expr(e, registry, input.schema().as_ref())
+                            })
                             .collect::<Result<Vec<Arc<dyn PhysicalExpr>>, _>>()?;
 
                         Ok(Arc::new(RepartitionExec::try_new(
@@ -285,7 +289,13 @@ impl AsExecutionPlan for PhysicalPlanNode {
                                 let window_node_expr = window_node
                                     .expr
                                     .as_ref()
-                                    .map(|e| parse_physical_expr(e.as_ref(), registry))
+                                    .map(|e| {
+                                        parse_physical_expr(
+                                            e.as_ref(),
+                                            registry,
+                                            &physical_schema,
+                                        )
+                                    })
                                     .transpose()?
                                     .ok_or_else(|| {
                                         proto_error(
@@ -347,7 +357,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
                     .iter()
                     .zip(hash_agg.group_expr_name.iter())
                     .map(|(expr, name)| {
-                        parse_physical_expr(expr, registry)
+                        parse_physical_expr(expr, registry, input.schema().as_ref())
                             .map(|expr| (expr, name.to_string()))
                     })
                     .collect::<Result<Vec<_>, _>>()?;
@@ -357,7 +367,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
                     .iter()
                     .zip(hash_agg.group_expr_name.iter())
                     .map(|(expr, name)| {
-                        parse_physical_expr(expr, registry)
+                        parse_physical_expr(expr, registry, input.schema().as_ref())
                             .map(|expr| (expr, name.to_string()))
                     })
                     .collect::<Result<Vec<_>, _>>()?;
@@ -409,7 +419,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
                                         )?;
 
                                 let input_phy_expr: Vec<Arc<dyn PhysicalExpr>> = agg_node.expr.iter()
-                                    .map(|e| parse_physical_expr(e, registry).unwrap()).collect();
+                                    .map(|e| parse_physical_expr(e, registry, &physical_schema).unwrap()).collect();
 
                                 Ok(create_aggregate_expr(
                                     &aggr_function.into(),
@@ -457,22 +467,29 @@ impl AsExecutionPlan for PhysicalPlanNode {
                         Ok((left, right))
                     })
                     .collect::<Result<_, BallistaError>>()?;
-                let join_type = protobuf::JoinType::from_i32(hashjoin.join_type)
-                    .ok_or_else(|| {
-                        proto_error(format!(
+                let join_type =
+                    datafusion_proto::protobuf::JoinType::from_i32(hashjoin.join_type)
+                        .ok_or_else(|| {
+                            proto_error(format!(
                             "Received a HashJoinNode message with unknown JoinType {}",
                             hashjoin.join_type
                         ))
-                    })?;
+                        })?;
                 let filter = hashjoin
                     .filter
                     .as_ref()
                     .map(|f| {
+                        let schema = f
+                            .schema
+                            .as_ref()
+                            .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
+                            .try_into()?;
+
                         let expression = parse_physical_expr(
                             f.expression.as_ref().ok_or_else(|| {
                                 proto_error("Unexpected empty filter expression")
                             })?,
-                            registry,
+                            registry, &schema
                         )?;
                         let column_indices = f.column_indices
                             .iter()
@@ -489,11 +506,6 @@ impl AsExecutionPlan for PhysicalPlanNode {
                                 })
                             })
                             .collect::<Result<Vec<_>, BallistaError>>()?;
-                        let schema = f
-                            .schema
-                            .as_ref()
-                            .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
-                            .try_into()?;
 
                         Ok(JoinFilter::new(expression, column_indices, schema))
                     })
@@ -558,6 +570,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
                 let output_partitioning = parse_protobuf_hash_partitioning(
                     shuffle_writer.output_partitioning.as_ref(),
                     registry,
+                    input.schema().as_ref(),
                 )?;
 
                 Ok(Arc::new(ShuffleWriterExec::try_new(
@@ -613,7 +626,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
                                 })?
                                 .as_ref();
                             Ok(PhysicalSortExpr {
-                                expr: parse_physical_expr(expr,registry)?,
+                                expr: parse_physical_expr(expr,registry, input.schema().as_ref())?,
                                 options: SortOptions {
                                     descending: !sort_expr.asc,
                                     nulls_first: sort_expr.nulls_first,
@@ -766,7 +779,8 @@ impl AsExecutionPlan for PhysicalPlanNode {
                     }),
                 })
                 .collect();
-            let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
+            let join_type: datafusion_proto::protobuf::JoinType =
+                exec.join_type().to_owned().into();
             let filter = exec
                 .filter()
                 .as_ref()
@@ -1372,6 +1386,7 @@ mod roundtrip_tests {
                 lit(ScalarValue::Int64(Some(2))),
             ],
             false,
+            schema.as_ref(),
         ));
         let and = binary(not, Operator::And, in_list, &schema)?;
         roundtrip_test(Arc::new(FilterExec::try_new(
diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
index 7896ddde..16195377 100644
--- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
@@ -349,13 +349,9 @@ impl TryFrom<&PartitionedFile> for protobuf::PartitionedFile {
 
     fn try_from(pf: &PartitionedFile) -> Result<Self, Self::Error> {
         Ok(protobuf::PartitionedFile {
-            path: pf.file_meta.path().to_owned(),
-            size: pf.file_meta.size(),
-            last_modified_ns: pf
-                .file_meta
-                .last_modified
-                .map(|ts| ts.timestamp_nanos() as u64)
-                .unwrap_or(0),
+            path: pf.object_meta.location.as_ref().to_owned(),
+            size: pf.object_meta.size as u64,
+            last_modified_ns: pf.object_meta.last_modified.timestamp_nanos() as u64,
             partition_values: pf
                 .partition_values
                 .iter()
diff --git a/ballista/rust/executor/Cargo.toml b/ballista/rust/executor/Cargo.toml
index 9b32fb8b..d05138ce 100644
--- a/ballista/rust/executor/Cargo.toml
+++ b/ballista/rust/executor/Cargo.toml
@@ -34,14 +34,14 @@ snmalloc = ["snmalloc-rs"]
 
 [dependencies]
 anyhow = "1"
-arrow = { version = "16.0.0" }
-arrow-flight = { version = "16.0.0" }
+arrow = { version = "18.0.0" }
+arrow-flight = { version = "18.0.0" }
 async-trait = "0.1.41"
 ballista-core = { path = "../core", version = "0.7.0" }
 chrono = { version = "0.4", default-features = false }
 configure_me = "0.4.0"
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
 env_logger = "0.9"
 futures = "0.3"
 hyper = "0.14.4"
diff --git a/ballista/rust/executor/src/execution_loop.rs b/ballista/rust/executor/src/execution_loop.rs
index 1e382b02..0520ab65 100644
--- a/ballista/rust/executor/src/execution_loop.rs
+++ b/ballista/rust/executor/src/execution_loop.rs
@@ -178,6 +178,7 @@ async fn run_received_tasks<T: 'static + AsLogicalPlan, U: 'static + AsExecution
     let shuffle_output_partitioning = parse_protobuf_hash_partitioning(
         task.output_partitioning.as_ref(),
         task_context.as_ref(),
+        plan.schema().as_ref(),
     )?;
 
     tokio::spawn(async move {
diff --git a/ballista/rust/executor/src/executor_server.rs b/ballista/rust/executor/src/executor_server.rs
index de202a54..158c708a 100644
--- a/ballista/rust/executor/src/executor_server.rs
+++ b/ballista/rust/executor/src/executor_server.rs
@@ -220,6 +220,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
         let shuffle_output_partitioning = parse_protobuf_hash_partitioning(
             task.output_partitioning.as_ref(),
             task_context.as_ref(),
+            plan.schema().as_ref(),
         )?;
 
         let execution_result = self
diff --git a/ballista/rust/scheduler/Cargo.toml b/ballista/rust/scheduler/Cargo.toml
index 5da74bad..8ff40930 100644
--- a/ballista/rust/scheduler/Cargo.toml
+++ b/ballista/rust/scheduler/Cargo.toml
@@ -41,8 +41,8 @@ async-trait = "0.1.41"
 ballista-core = { path = "../core", version = "0.7.0" }
 clap = { version = "3", features = ["derive", "cargo"] }
 configure_me = "0.4.0"
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
 env_logger = "0.9"
 etcd-client = { version = "0.9", optional = true }
 futures = "0.3"
@@ -50,6 +50,7 @@ http = "0.2"
 http-body = "0.4"
 hyper = "0.14.4"
 log = "0.4"
+object_store = "0.3.0"
 parking_lot = "0.12"
 parse_arg = "0.1.3"
 prost = "0.10"
diff --git a/ballista/rust/scheduler/src/scheduler_server/grpc.rs b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
index 01c2058b..0cea1be4 100644
--- a/ballista/rust/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
@@ -15,12 +15,25 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::scheduler_server::event::QueryStageSchedulerEvent;
-use crate::scheduler_server::{
-    create_datafusion_context, update_datafusion_context, SchedulerServer,
-};
-use crate::state::task_scheduler::TaskScheduler;
+use std::convert::TryInto;
+use std::ops::Deref;
+use std::sync::Arc;
+use std::time::Instant;
+use std::time::{SystemTime, UNIX_EPOCH};
+
 use anyhow::Context;
+use datafusion::datasource::file_format::parquet::ParquetFormat;
+use datafusion::datasource::file_format::FileFormat;
+use datafusion_proto::logical_plan::AsLogicalPlan;
+use datafusion_proto::protobuf::FileType;
+use futures::TryStreamExt;
+use log::{debug, error, info, trace, warn};
+use object_store::local::LocalFileSystem;
+use object_store::path::Path;
+use object_store::ObjectStore;
+use rand::{distributions::Alphanumeric, thread_rng, Rng};
+use tonic::{Request, Response, Status};
+
 use ballista_core::config::{BallistaConfig, TaskSchedulingPolicy};
 use ballista_core::error::BallistaError;
 use ballista_core::serde::protobuf::execute_query_params::{OptionalSessionId, Query};
@@ -29,30 +42,21 @@ use ballista_core::serde::protobuf::executor_registration::OptionalHost;
 use ballista_core::serde::protobuf::scheduler_grpc_server::SchedulerGrpc;
 use ballista_core::serde::protobuf::{
     job_status, ExecuteQueryParams, ExecuteQueryResult, ExecutorHeartbeat, FailedJob,
-    FileType, GetFileMetadataParams, GetFileMetadataResult, GetJobStatusParams,
-    GetJobStatusResult, HeartBeatParams, HeartBeatResult, JobStatus, PollWorkParams,
-    PollWorkResult, QueuedJob, RegisterExecutorParams, RegisterExecutorResult,
-    UpdateTaskStatusParams, UpdateTaskStatusResult,
+    GetFileMetadataParams, GetFileMetadataResult, GetJobStatusParams, GetJobStatusResult,
+    HeartBeatParams, HeartBeatResult, JobStatus, PollWorkParams, PollWorkResult,
+    QueuedJob, RegisterExecutorParams, RegisterExecutorResult, UpdateTaskStatusParams,
+    UpdateTaskStatusResult,
 };
 use ballista_core::serde::scheduler::{
     ExecutorData, ExecutorDataChange, ExecutorMetadata,
 };
 use ballista_core::serde::AsExecutionPlan;
-use datafusion::datafusion_data_access::object_store::{
-    local::LocalFileSystem, ObjectStore,
+
+use crate::scheduler_server::event::QueryStageSchedulerEvent;
+use crate::scheduler_server::{
+    create_datafusion_context, update_datafusion_context, SchedulerServer,
 };
-use datafusion::datasource::file_format::parquet::ParquetFormat;
-use datafusion::datasource::file_format::FileFormat;
-use datafusion_proto::logical_plan::AsLogicalPlan;
-use futures::TryStreamExt;
-use log::{debug, error, info, trace, warn};
-use rand::{distributions::Alphanumeric, thread_rng, Rng};
-use std::convert::TryInto;
-use std::ops::Deref;
-use std::sync::Arc;
-use std::time::Instant;
-use std::time::{SystemTime, UNIX_EPOCH};
-use tonic::{Request, Response, Status};
+use crate::state::task_scheduler::TaskScheduler;
 
 #[tonic::async_trait]
 impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
@@ -281,7 +285,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
         request: Request<GetFileMetadataParams>,
     ) -> std::result::Result<Response<GetFileMetadataResult>, tonic::Status> {
         // TODO support multiple object stores
-        let obj_store = Arc::new(LocalFileSystem {}) as Arc<dyn ObjectStore>;
+        let obj_store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
         // TODO shouldn't this take a ListingOption object as input?
 
         let GetFileMetadataParams { path, file_type } = request.into_inner();
@@ -300,8 +304,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
             )),
         }?;
 
+        let path = Path::from(path.as_str());
         let file_metas: Vec<_> = obj_store
-            .list_file(&path)
+            .list(Some(&path))
             .await
             .map_err(|e| {
                 let msg = format!("Error listing files: {}", e);
@@ -309,7 +314,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
                 tonic::Status::internal(msg)
             })?
             .try_collect()
-            .await?;
+            .await
+            .map_err(|e| {
+                let msg = format!("Error listing files: {}", e);
+                error!("{}", msg);
+                tonic::Status::internal(msg)
+            })?;
 
         let schema = file_format
             .infer_schema(&obj_store, &file_metas)
@@ -556,9 +566,10 @@ fn generate_job_id() -> String {
 mod test {
     use std::sync::Arc;
 
+    use datafusion::execution::context::default_session_builder;
+    use datafusion_proto::protobuf::LogicalPlanNode;
     use tonic::Request;
 
-    use crate::state::{backend::standalone::StandaloneClient, SchedulerState};
     use ballista_core::error::BallistaError;
     use ballista_core::serde::protobuf::{
         executor_registration::OptionalHost, ExecutorRegistration, PhysicalPlanNode,
@@ -566,8 +577,8 @@ mod test {
     };
     use ballista_core::serde::scheduler::ExecutorSpecification;
     use ballista_core::serde::BallistaCodec;
-    use datafusion::execution::context::default_session_builder;
-    use datafusion_proto::protobuf::LogicalPlanNode;
+
+    use crate::state::{backend::standalone::StandaloneClient, SchedulerState};
 
     use super::{SchedulerGrpc, SchedulerServer};
 
diff --git a/ballista/rust/scheduler/src/scheduler_server/mod.rs b/ballista/rust/scheduler/src/scheduler_server/mod.rs
index 68dede14..10f26b1d 100644
--- a/ballista/rust/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/mod.rs
@@ -15,25 +15,28 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::scheduler_server::event::{QueryStageSchedulerEvent, SchedulerServerEvent};
-use crate::scheduler_server::event_loop::SchedulerServerEventAction;
-use crate::scheduler_server::query_stage_scheduler::QueryStageScheduler;
-use crate::state::backend::StateBackendClient;
-use crate::state::SchedulerState;
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::{SystemTime, UNIX_EPOCH};
+
+use datafusion::execution::context::{default_session_builder, SessionState};
+use datafusion::prelude::{SessionConfig, SessionContext};
+use datafusion_proto::logical_plan::AsLogicalPlan;
+use tokio::sync::RwLock;
+use tonic::transport::Channel;
+
 use ballista_core::config::{BallistaConfig, TaskSchedulingPolicy};
 use ballista_core::error::Result;
 use ballista_core::event_loop::EventLoop;
 use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient;
 use ballista_core::serde::protobuf::TaskStatus;
 use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
-use datafusion::execution::context::{default_session_builder, SessionState};
-use datafusion::prelude::{SessionConfig, SessionContext};
-use datafusion_proto::logical_plan::AsLogicalPlan;
-use std::collections::HashMap;
-use std::sync::Arc;
-use std::time::{SystemTime, UNIX_EPOCH};
-use tokio::sync::RwLock;
-use tonic::transport::Channel;
+
+use crate::scheduler_server::event::{QueryStageSchedulerEvent, SchedulerServerEvent};
+use crate::scheduler_server::event_loop::SchedulerServerEventAction;
+use crate::scheduler_server::query_stage_scheduler::QueryStageScheduler;
+use crate::state::backend::StateBackendClient;
+use crate::state::SchedulerState;
 
 // include the generated protobuf source as a submodule
 #[allow(clippy::all)]
@@ -221,12 +224,14 @@ pub fn update_datafusion_context(
 ) -> Arc<SessionContext> {
     {
         let mut mut_state = session_ctx.state.write();
-        mut_state.config.target_partitions = config.default_shuffle_partitions();
-        mut_state.config.batch_size = config.default_batch_size();
-        mut_state.config.repartition_joins = config.repartition_joins();
-        mut_state.config.repartition_aggregations = config.repartition_aggregations();
-        mut_state.config.repartition_windows = config.repartition_windows();
-        mut_state.config.parquet_pruning = config.parquet_pruning();
+        // TODO Currently we have to start from default session config due to the interface not support update
+        mut_state.config = SessionConfig::default()
+            .with_target_partitions(config.default_shuffle_partitions())
+            .with_batch_size(config.default_batch_size())
+            .with_repartition_joins(config.repartition_joins())
+            .with_repartition_aggregations(config.repartition_aggregations())
+            .with_repartition_windows(config.repartition_windows())
+            .with_parquet_pruning(config.parquet_pruning());
     }
     session_ctx
 }
@@ -277,11 +282,19 @@ impl SessionContextRegistry {
         sessions.remove(session_id)
     }
 }
+
 #[cfg(all(test, feature = "sled"))]
 mod test {
     use std::sync::Arc;
     use std::time::{Duration, Instant};
 
+    use datafusion::arrow::datatypes::{DataType, Field, Schema};
+    use datafusion::execution::context::default_session_builder;
+    use datafusion::logical_plan::{col, sum, LogicalPlan};
+    use datafusion::prelude::{SessionConfig, SessionContext};
+    use datafusion::test_util::scan_empty;
+    use datafusion_proto::protobuf::LogicalPlanNode;
+
     use ballista_core::config::TaskSchedulingPolicy;
     use ballista_core::error::{BallistaError, Result};
     use ballista_core::execution_plans::ShuffleWriterExec;
@@ -290,12 +303,6 @@ mod test {
     };
     use ballista_core::serde::scheduler::ExecutorData;
     use ballista_core::serde::BallistaCodec;
-    use datafusion::arrow::datatypes::{DataType, Field, Schema};
-    use datafusion::execution::context::default_session_builder;
-    use datafusion::logical_plan::{col, sum, LogicalPlan};
-    use datafusion::prelude::{SessionConfig, SessionContext};
-    use datafusion::test_util::scan_empty;
-    use datafusion_proto::protobuf::LogicalPlanNode;
 
     use crate::scheduler_server::event::QueryStageSchedulerEvent;
     use crate::scheduler_server::SchedulerServer;
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 9b12889b..d2797322 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -33,8 +33,8 @@ snmalloc = ["snmalloc-rs"]
 
 [dependencies]
 ballista = { path = "../ballista/rust/client" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
 env_logger = "0.9"
 futures = "0.3"
 mimalloc = { version = "0.1", optional = true, default-features = false }
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index 5070aed2..c20194ac 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -35,7 +35,7 @@ required-features = ["ballista/standalone"]
 
 [dependencies]
 ballista = { path = "../ballista/rust/client", version = "0.7.0" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
 futures = "0.3"
 num_cpus = "1.13.0"
 prost = "0.10"