You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by av...@apache.org on 2023/01/31 17:47:05 UTC

[arrow-ballista] branch main updated: Upgrade to DataFusion 17 (#639)

This is an automated email from the ASF dual-hosted git repository.

avantgardner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 1b0be751 Upgrade to DataFusion 17 (#639)
1b0be751 is described below

commit 1b0be75127f508d81431516dccf92f74bafc98c5
Author: Brent Gardner <bg...@squarelabs.net>
AuthorDate: Tue Jan 31 10:46:59 2023 -0700

    Upgrade to DataFusion 17 (#639)
    
    * Upgrade to DF 17
    
    * Restore original error handling functionality
---
 ballista-cli/Cargo.toml                       |   4 +-
 ballista/client/Cargo.toml                    |   4 +-
 ballista/core/Cargo.toml                      |   6 +-
 ballista/core/src/client.rs                   |   2 +-
 ballista/core/src/serde/generated/ballista.rs | 105 ++++++++++++++++++++++++++
 ballista/core/src/serde/mod.rs                |   6 +-
 ballista/executor/Cargo.toml                  |   8 +-
 ballista/executor/src/flight_service.rs       |  30 ++++----
 ballista/scheduler/Cargo.toml                 |   6 +-
 ballista/scheduler/src/flight_sql.rs          |  46 ++++++-----
 benchmarks/Cargo.toml                         |   4 +-
 benchmarks/src/bin/tpch.rs                    |   4 +-
 examples/Cargo.toml                           |   2 +-
 python/Cargo.toml                             |   6 +-
 14 files changed, 170 insertions(+), 63 deletions(-)

diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml
index a726b4fa..48261075 100644
--- a/ballista-cli/Cargo.toml
+++ b/ballista-cli/Cargo.toml
@@ -33,8 +33,8 @@ ballista = { path = "../ballista/client", version = "0.10.0", features = [
     "standalone",
 ] }
 clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = "16.1.0"
-datafusion-cli = "16.1.0"
+datafusion = "17.0.0"
+datafusion-cli = "17.0.0"
 dirs = "4.0.0"
 env_logger = "0.10"
 mimalloc = { version = "0.1", default-features = false }
diff --git a/ballista/client/Cargo.toml b/ballista/client/Cargo.toml
index d262e5ba..f57a6f2b 100644
--- a/ballista/client/Cargo.toml
+++ b/ballista/client/Cargo.toml
@@ -31,8 +31,8 @@ rust-version = "1.63"
 ballista-core = { path = "../core", version = "0.10.0" }
 ballista-executor = { path = "../executor", version = "0.10.0", optional = true }
 ballista-scheduler = { path = "../scheduler", version = "0.10.0", optional = true }
-datafusion = "16.1.0"
-datafusion-proto = "16.1.0"
+datafusion = "17.0.0"
+datafusion-proto = "17.0.0"
 futures = "0.3"
 log = "0.4"
 parking_lot = "0.12"
diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml
index 27b28a5a..09272958 100644
--- a/ballista/core/Cargo.toml
+++ b/ballista/core/Cargo.toml
@@ -46,13 +46,13 @@ simd = ["datafusion/simd"]
 [dependencies]
 ahash = { version = "0.8", default-features = false }
 
-arrow-flight = { version = "29.0.0", features = ["flight-sql-experimental"] }
+arrow-flight = { version = "31.0.0", features = ["flight-sql-experimental"] }
 async-trait = "0.1.41"
 chrono = { version = "0.4", default-features = false }
 clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = "16.1.0"
+datafusion = "17.0.0"
 datafusion-objectstore-hdfs = { version = "0.1.1", default-features = false, optional = true }
-datafusion-proto = "16.1.0"
+datafusion-proto = "17.0.0"
 futures = "0.3"
 hashbrown = "0.13"
 
diff --git a/ballista/core/src/client.rs b/ballista/core/src/client.rs
index a9e616e8..d91c6fac 100644
--- a/ballista/core/src/client.rs
+++ b/ballista/core/src/client.rs
@@ -116,7 +116,7 @@ impl BallistaClient {
             .encode(&mut buf)
             .map_err(|e| BallistaError::GrpcActionError(format!("{e:?}")))?;
 
-        let request = tonic::Request::new(Ticket { ticket: buf });
+        let request = tonic::Request::new(Ticket { ticket: buf.into() });
 
         let mut stream = self
             .flight_client
diff --git a/ballista/core/src/serde/generated/ballista.rs b/ballista/core/src/serde/generated/ballista.rs
index 3fc305ee..0c4de2e8 100644
--- a/ballista/core/src/serde/generated/ballista.rs
+++ b/ballista/core/src/serde/generated/ballista.rs
@@ -1,6 +1,7 @@
 /// /////////////////////////////////////////////////////////////////////////////////////////////////
 /// Ballista Physical Plan
 /// /////////////////////////////////////////////////////////////////////////////////////////////////
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct BallistaPhysicalPlanNode {
     #[prost(oneof = "ballista_physical_plan_node::PhysicalPlanType", tags = "1, 2, 3")]
@@ -10,6 +11,7 @@ pub struct BallistaPhysicalPlanNode {
 }
 /// Nested message and enum types in `BallistaPhysicalPlanNode`.
 pub mod ballista_physical_plan_node {
+    #[allow(clippy::derive_partial_eq_without_eq)]
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum PhysicalPlanType {
         #[prost(message, tag = "1")]
@@ -20,6 +22,7 @@ pub mod ballista_physical_plan_node {
         UnresolvedShuffle(super::UnresolvedShuffleExecNode),
     }
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ShuffleWriterExecNode {
     /// TODO it seems redundant to provide job and stage id here since we also have them
@@ -35,6 +38,7 @@ pub struct ShuffleWriterExecNode {
         ::datafusion_proto::protobuf::PhysicalHashRepartition,
     >,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct UnresolvedShuffleExecNode {
     #[prost(uint32, tag = "1")]
@@ -46,6 +50,7 @@ pub struct UnresolvedShuffleExecNode {
     #[prost(uint32, tag = "4")]
     pub output_partition_count: u32,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ShuffleReaderExecNode {
     #[prost(message, repeated, tag = "1")]
@@ -53,6 +58,7 @@ pub struct ShuffleReaderExecNode {
     #[prost(message, optional, tag = "2")]
     pub schema: ::core::option::Option<::datafusion_proto::protobuf::Schema>,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ShuffleReaderPartition {
     /// each partition of a shuffle read can read data from multiple locations
@@ -62,6 +68,7 @@ pub struct ShuffleReaderPartition {
 /// /////////////////////////////////////////////////////////////////////////////////////////////////
 /// Ballista Scheduling
 /// /////////////////////////////////////////////////////////////////////////////////////////////////
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutionGraph {
     #[prost(string, tag = "1")]
@@ -91,6 +98,7 @@ pub struct ExecutionGraph {
     #[prost(uint64, tag = "13")]
     pub queued_at: u64,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct StageAttempts {
     #[prost(uint32, tag = "1")]
@@ -98,6 +106,7 @@ pub struct StageAttempts {
     #[prost(uint32, repeated, tag = "2")]
     pub stage_attempt_num: ::prost::alloc::vec::Vec<u32>,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutionGraphStage {
     #[prost(oneof = "execution_graph_stage::StageType", tags = "1, 2, 3, 4")]
@@ -105,6 +114,7 @@ pub struct ExecutionGraphStage {
 }
 /// Nested message and enum types in `ExecutionGraphStage`.
 pub mod execution_graph_stage {
+    #[allow(clippy::derive_partial_eq_without_eq)]
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum StageType {
         #[prost(message, tag = "1")]
@@ -117,6 +127,7 @@ pub mod execution_graph_stage {
         FailedStage(super::FailedStage),
     }
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct UnResolvedStage {
     #[prost(uint32, tag = "1")]
@@ -138,6 +149,7 @@ pub struct UnResolvedStage {
         ::prost::alloc::string::String,
     >,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ResolvedStage {
     #[prost(uint32, tag = "1")]
@@ -161,6 +173,7 @@ pub struct ResolvedStage {
         ::prost::alloc::string::String,
     >,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct SuccessfulStage {
     #[prost(uint32, tag = "1")]
@@ -184,6 +197,7 @@ pub struct SuccessfulStage {
     #[prost(uint32, tag = "9")]
     pub stage_attempt_num: u32,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct FailedStage {
     #[prost(uint32, tag = "1")]
@@ -207,6 +221,7 @@ pub struct FailedStage {
     #[prost(uint32, tag = "9")]
     pub stage_attempt_num: u32,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct TaskInfo {
     #[prost(uint32, tag = "1")]
@@ -233,6 +248,7 @@ pub struct TaskInfo {
 }
 /// Nested message and enum types in `TaskInfo`.
 pub mod task_info {
+    #[allow(clippy::derive_partial_eq_without_eq)]
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum Status {
         #[prost(message, tag = "8")]
@@ -243,6 +259,7 @@ pub mod task_info {
         Successful(super::SuccessfulTask),
     }
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct GraphStageInput {
     #[prost(uint32, tag = "1")]
@@ -252,6 +269,7 @@ pub struct GraphStageInput {
     #[prost(bool, tag = "3")]
     pub complete: bool,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct TaskInputPartitions {
     #[prost(uint32, tag = "1")]
@@ -259,6 +277,7 @@ pub struct TaskInputPartitions {
     #[prost(message, repeated, tag = "2")]
     pub partition_location: ::prost::alloc::vec::Vec<PartitionLocation>,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct KeyValuePair {
     #[prost(string, tag = "1")]
@@ -266,6 +285,7 @@ pub struct KeyValuePair {
     #[prost(string, tag = "2")]
     pub value: ::prost::alloc::string::String,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct Action {
     /// configuration settings
@@ -276,6 +296,7 @@ pub struct Action {
 }
 /// Nested message and enum types in `Action`.
 pub mod action {
+    #[allow(clippy::derive_partial_eq_without_eq)]
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum ActionType {
         /// Fetch a partition from an executor
@@ -283,6 +304,7 @@ pub mod action {
         FetchPartition(super::FetchPartition),
     }
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutePartition {
     #[prost(string, tag = "1")]
@@ -302,6 +324,7 @@ pub struct ExecutePartition {
         ::datafusion_proto::protobuf::PhysicalHashRepartition,
     >,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct FetchPartition {
     #[prost(string, tag = "1")]
@@ -317,6 +340,7 @@ pub struct FetchPartition {
     #[prost(uint32, tag = "6")]
     pub port: u32,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PartitionLocation {
     /// partition_id of the map stage who produces the shuffle.
@@ -333,6 +357,7 @@ pub struct PartitionLocation {
     pub path: ::prost::alloc::string::String,
 }
 /// Unique identifier for a materialized partition of data
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PartitionId {
     #[prost(string, tag = "1")]
@@ -342,6 +367,7 @@ pub struct PartitionId {
     #[prost(uint32, tag = "4")]
     pub partition_id: u32,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct TaskId {
     #[prost(uint32, tag = "1")]
@@ -351,6 +377,7 @@ pub struct TaskId {
     #[prost(uint32, tag = "3")]
     pub partition_id: u32,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PartitionStats {
     #[prost(int64, tag = "1")]
@@ -362,6 +389,7 @@ pub struct PartitionStats {
     #[prost(message, repeated, tag = "4")]
     pub column_stats: ::prost::alloc::vec::Vec<ColumnStats>,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ColumnStats {
     #[prost(message, optional, tag = "1")]
@@ -373,11 +401,13 @@ pub struct ColumnStats {
     #[prost(uint32, tag = "4")]
     pub distinct_count: u32,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct OperatorMetricsSet {
     #[prost(message, repeated, tag = "1")]
     pub metrics: ::prost::alloc::vec::Vec<OperatorMetric>,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct NamedCount {
     #[prost(string, tag = "1")]
@@ -385,6 +415,7 @@ pub struct NamedCount {
     #[prost(uint64, tag = "2")]
     pub value: u64,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct NamedGauge {
     #[prost(string, tag = "1")]
@@ -392,6 +423,7 @@ pub struct NamedGauge {
     #[prost(uint64, tag = "2")]
     pub value: u64,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct NamedTime {
     #[prost(string, tag = "1")]
@@ -399,6 +431,7 @@ pub struct NamedTime {
     #[prost(uint64, tag = "2")]
     pub value: u64,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct OperatorMetric {
     #[prost(oneof = "operator_metric::Metric", tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10")]
@@ -406,6 +439,7 @@ pub struct OperatorMetric {
 }
 /// Nested message and enum types in `OperatorMetric`.
 pub mod operator_metric {
+    #[allow(clippy::derive_partial_eq_without_eq)]
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum Metric {
         #[prost(uint64, tag = "1")]
@@ -431,6 +465,7 @@ pub mod operator_metric {
     }
 }
 /// Used by scheduler
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutorMetadata {
     #[prost(string, tag = "1")]
@@ -445,6 +480,7 @@ pub struct ExecutorMetadata {
     pub specification: ::core::option::Option<ExecutorSpecification>,
 }
 /// Used by grpc
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutorRegistration {
     #[prost(string, tag = "1")]
@@ -464,12 +500,14 @@ pub struct ExecutorRegistration {
 pub mod executor_registration {
     /// "optional" keyword is stable in protoc 3.15 but prost is still on 3.14 (see <https://github.com/tokio-rs/prost/issues/430> and <https://github.com/tokio-rs/prost/pull/455>)
     /// this syntax is ugly but is binary compatible with the "optional" keyword (see <https://stackoverflow.com/questions/42622015/how-to-define-an-optional-field-in-protobuf-3>)
+    #[allow(clippy::derive_partial_eq_without_eq)]
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum OptionalHost {
         #[prost(string, tag = "2")]
         Host(::prost::alloc::string::String),
     }
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutorHeartbeat {
     #[prost(string, tag = "1")]
@@ -482,6 +520,7 @@ pub struct ExecutorHeartbeat {
     #[prost(message, optional, tag = "4")]
     pub status: ::core::option::Option<ExecutorStatus>,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutorMetric {
     /// TODO add more metrics
@@ -491,12 +530,14 @@ pub struct ExecutorMetric {
 /// Nested message and enum types in `ExecutorMetric`.
 pub mod executor_metric {
     /// TODO add more metrics
+    #[allow(clippy::derive_partial_eq_without_eq)]
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum Metric {
         #[prost(uint64, tag = "1")]
         AvailableMemory(u64),
     }
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutorStatus {
     #[prost(oneof = "executor_status::Status", tags = "1, 2, 3")]
@@ -504,6 +545,7 @@ pub struct ExecutorStatus {
 }
 /// Nested message and enum types in `ExecutorStatus`.
 pub mod executor_status {
+    #[allow(clippy::derive_partial_eq_without_eq)]
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum Status {
         #[prost(string, tag = "1")]
@@ -514,11 +556,13 @@ pub mod executor_status {
         Unknown(::prost::alloc::string::String),
     }
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutorSpecification {
     #[prost(message, repeated, tag = "1")]
     pub resources: ::prost::alloc::vec::Vec<ExecutorResource>,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutorResource {
     /// TODO add more resources
@@ -528,12 +572,14 @@ pub struct ExecutorResource {
 /// Nested message and enum types in `ExecutorResource`.
 pub mod executor_resource {
     /// TODO add more resources
+    #[allow(clippy::derive_partial_eq_without_eq)]
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum Resource {
         #[prost(uint32, tag = "1")]
         TaskSlots(u32),
     }
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutorData {
     #[prost(string, tag = "1")]
@@ -541,6 +587,7 @@ pub struct ExecutorData {
     #[prost(message, repeated, tag = "2")]
     pub resources: ::prost::alloc::vec::Vec<ExecutorResourcePair>,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutorResourcePair {
     #[prost(message, optional, tag = "1")]
@@ -548,11 +595,13 @@ pub struct ExecutorResourcePair {
     #[prost(message, optional, tag = "2")]
     pub available: ::core::option::Option<ExecutorResource>,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct RunningTask {
     #[prost(string, tag = "1")]
     pub executor_id: ::prost::alloc::string::String,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct FailedTask {
     #[prost(string, tag = "1")]
@@ -567,6 +616,7 @@ pub struct FailedTask {
 }
 /// Nested message and enum types in `FailedTask`.
 pub mod failed_task {
+    #[allow(clippy::derive_partial_eq_without_eq)]
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum FailedReason {
         #[prost(message, tag = "4")]
@@ -584,6 +634,7 @@ pub mod failed_task {
         TaskKilled(super::TaskKilled),
     }
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct SuccessfulTask {
     #[prost(string, tag = "1")]
@@ -593,8 +644,10 @@ pub struct SuccessfulTask {
     #[prost(message, repeated, tag = "2")]
     pub partitions: ::prost::alloc::vec::Vec<ShuffleWritePartition>,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutionError {}
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct FetchPartitionError {
     #[prost(string, tag = "1")]
@@ -604,14 +657,19 @@ pub struct FetchPartitionError {
     #[prost(uint32, tag = "3")]
     pub map_partition_id: u32,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct IoError {}
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutorLost {}
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ResultLost {}
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct TaskKilled {}
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ShuffleWritePartition {
     #[prost(uint64, tag = "1")]
@@ -625,6 +683,7 @@ pub struct ShuffleWritePartition {
     #[prost(uint64, tag = "5")]
     pub num_bytes: u64,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct TaskStatus {
     #[prost(uint32, tag = "1")]
@@ -650,6 +709,7 @@ pub struct TaskStatus {
 }
 /// Nested message and enum types in `TaskStatus`.
 pub mod task_status {
+    #[allow(clippy::derive_partial_eq_without_eq)]
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum Status {
         #[prost(message, tag = "9")]
@@ -660,6 +720,7 @@ pub mod task_status {
         Successful(super::SuccessfulTask),
     }
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PollWorkParams {
     #[prost(message, optional, tag = "1")]
@@ -670,6 +731,7 @@ pub struct PollWorkParams {
     #[prost(message, repeated, tag = "3")]
     pub task_status: ::prost::alloc::vec::Vec<TaskStatus>,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct TaskDefinition {
     #[prost(uint32, tag = "1")]
@@ -699,6 +761,7 @@ pub struct TaskDefinition {
     pub props: ::prost::alloc::vec::Vec<KeyValuePair>,
 }
 /// A set of tasks in the same stage
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct MultiTaskDefinition {
     #[prost(message, repeated, tag = "1")]
@@ -723,11 +786,13 @@ pub struct MultiTaskDefinition {
     #[prost(message, repeated, tag = "9")]
     pub props: ::prost::alloc::vec::Vec<KeyValuePair>,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct SessionSettings {
     #[prost(message, repeated, tag = "1")]
     pub configs: ::prost::alloc::vec::Vec<KeyValuePair>,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct JobSessionConfig {
     #[prost(string, tag = "1")]
@@ -735,21 +800,25 @@ pub struct JobSessionConfig {
     #[prost(message, repeated, tag = "2")]
     pub configs: ::prost::alloc::vec::Vec<KeyValuePair>,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PollWorkResult {
     #[prost(message, repeated, tag = "1")]
     pub tasks: ::prost::alloc::vec::Vec<TaskDefinition>,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct RegisterExecutorParams {
     #[prost(message, optional, tag = "1")]
     pub metadata: ::core::option::Option<ExecutorRegistration>,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct RegisterExecutorResult {
     #[prost(bool, tag = "1")]
     pub success: bool,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct HeartBeatParams {
     #[prost(string, tag = "1")]
@@ -759,12 +828,14 @@ pub struct HeartBeatParams {
     #[prost(message, optional, tag = "3")]
     pub status: ::core::option::Option<ExecutorStatus>,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct HeartBeatResult {
     /// TODO it's from Spark for BlockManager
     #[prost(bool, tag = "1")]
     pub reregister: bool,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct StopExecutorParams {
     #[prost(string, tag = "1")]
@@ -776,8 +847,10 @@ pub struct StopExecutorParams {
     #[prost(bool, tag = "3")]
     pub force: bool,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct StopExecutorResult {}
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutorStoppedParams {
     #[prost(string, tag = "1")]
@@ -786,8 +859,10 @@ pub struct ExecutorStoppedParams {
     #[prost(string, tag = "2")]
     pub reason: ::prost::alloc::string::String,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutorStoppedResult {}
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct UpdateTaskStatusParams {
     #[prost(string, tag = "1")]
@@ -796,11 +871,13 @@ pub struct UpdateTaskStatusParams {
     #[prost(message, repeated, tag = "2")]
     pub task_status: ::prost::alloc::vec::Vec<TaskStatus>,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct UpdateTaskStatusResult {
     #[prost(bool, tag = "1")]
     pub success: bool,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecuteQueryParams {
     #[prost(message, repeated, tag = "4")]
@@ -814,6 +891,7 @@ pub struct ExecuteQueryParams {
 }
 /// Nested message and enum types in `ExecuteQueryParams`.
 pub mod execute_query_params {
+    #[allow(clippy::derive_partial_eq_without_eq)]
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum Query {
         #[prost(bytes, tag = "1")]
@@ -821,17 +899,20 @@ pub mod execute_query_params {
         #[prost(string, tag = "2")]
         Sql(::prost::alloc::string::String),
     }
+    #[allow(clippy::derive_partial_eq_without_eq)]
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum OptionalSessionId {
         #[prost(string, tag = "3")]
         SessionId(::prost::alloc::string::String),
     }
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecuteSqlParams {
     #[prost(string, tag = "1")]
     pub sql: ::prost::alloc::string::String,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecuteQueryResult {
     #[prost(string, tag = "1")]
@@ -839,26 +920,32 @@ pub struct ExecuteQueryResult {
     #[prost(string, tag = "2")]
     pub session_id: ::prost::alloc::string::String,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct GetJobStatusParams {
     #[prost(string, tag = "1")]
     pub job_id: ::prost::alloc::string::String,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct SuccessfulJob {
     #[prost(message, repeated, tag = "1")]
     pub partition_location: ::prost::alloc::vec::Vec<PartitionLocation>,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct QueuedJob {}
 /// TODO: add progress report
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct RunningJob {}
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct FailedJob {
     #[prost(string, tag = "1")]
     pub error: ::prost::alloc::string::String,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct JobStatus {
     #[prost(oneof = "job_status::Status", tags = "1, 2, 3, 4")]
@@ -866,6 +953,7 @@ pub struct JobStatus {
 }
 /// Nested message and enum types in `JobStatus`.
 pub mod job_status {
+    #[allow(clippy::derive_partial_eq_without_eq)]
     #[derive(Clone, PartialEq, ::prost::Oneof)]
     pub enum Status {
         #[prost(message, tag = "1")]
@@ -878,11 +966,13 @@ pub mod job_status {
         Successful(super::SuccessfulJob),
     }
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct GetJobStatusResult {
     #[prost(message, optional, tag = "1")]
     pub status: ::core::option::Option<JobStatus>,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct GetFileMetadataParams {
     #[prost(string, tag = "1")]
@@ -890,33 +980,40 @@ pub struct GetFileMetadataParams {
     #[prost(string, tag = "2")]
     pub file_type: ::prost::alloc::string::String,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct GetFileMetadataResult {
     #[prost(message, optional, tag = "1")]
     pub schema: ::core::option::Option<::datafusion_proto::protobuf::Schema>,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct FilePartitionMetadata {
     #[prost(string, repeated, tag = "1")]
     pub filename: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct CancelJobParams {
     #[prost(string, tag = "1")]
     pub job_id: ::prost::alloc::string::String,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct CancelJobResult {
     #[prost(bool, tag = "1")]
     pub cancelled: bool,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct CleanJobDataParams {
     #[prost(string, tag = "1")]
     pub job_id: ::prost::alloc::string::String,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct CleanJobDataResult {}
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct LaunchTaskParams {
     /// Allow to launch a task set to an executor at once
@@ -925,6 +1022,7 @@ pub struct LaunchTaskParams {
     #[prost(string, tag = "2")]
     pub scheduler_id: ::prost::alloc::string::String,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct LaunchMultiTaskParams {
     /// Allow to launch a task set to an executor at once
@@ -933,35 +1031,42 @@ pub struct LaunchMultiTaskParams {
     #[prost(string, tag = "2")]
     pub scheduler_id: ::prost::alloc::string::String,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct LaunchTaskResult {
     /// TODO when part of the task set are scheduled successfully
     #[prost(bool, tag = "1")]
     pub success: bool,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct LaunchMultiTaskResult {
     /// TODO when part of the task set are scheduled successfully
     #[prost(bool, tag = "1")]
     pub success: bool,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct CancelTasksParams {
     #[prost(message, repeated, tag = "1")]
     pub task_infos: ::prost::alloc::vec::Vec<RunningTaskInfo>,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct CancelTasksResult {
     #[prost(bool, tag = "1")]
     pub cancelled: bool,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct RemoveJobDataParams {
     #[prost(string, tag = "1")]
     pub job_id: ::prost::alloc::string::String,
 }
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct RemoveJobDataResult {}
+#[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct RunningTaskInfo {
     #[prost(uint32, tag = "1")]
diff --git a/ballista/core/src/serde/mod.rs b/ballista/core/src/serde/mod.rs
index 96fdbfc6..1b277018 100644
--- a/ballista/core/src/serde/mod.rs
+++ b/ballista/core/src/serde/mod.rs
@@ -52,10 +52,10 @@ impl ProstMessageExt for protobuf::Action {
         "type.googleapis.com/arrow.flight.protocol.sql.Action"
     }
 
-    fn as_any(&self) -> prost_types::Any {
-        prost_types::Any {
+    fn as_any(&self) -> arrow_flight::sql::Any {
+        arrow_flight::sql::Any {
             type_url: protobuf::Action::type_url().to_string(),
-            value: self.encode_to_vec(),
+            value: self.encode_to_vec().into(),
         }
     }
 }
diff --git a/ballista/executor/Cargo.toml b/ballista/executor/Cargo.toml
index 24fe4a3a..7849d1e5 100644
--- a/ballista/executor/Cargo.toml
+++ b/ballista/executor/Cargo.toml
@@ -38,15 +38,15 @@ default = ["mimalloc"]
 
 [dependencies]
 anyhow = "1"
-arrow = { version = "29.0.0" }
-arrow-flight = { version = "29.0.0" }
+arrow = { version = "31.0.0" }
+arrow-flight = { version = "31.0.0" }
 async-trait = "0.1.41"
 ballista-core = { path = "../core", version = "0.10.0" }
 chrono = { version = "0.4", default-features = false }
 configure_me = "0.4.0"
 dashmap = "5.4.0"
-datafusion = "16.1.0"
-datafusion-proto = "16.1.0"
+datafusion = "17.0.0"
+datafusion-proto = "17.0.0"
 futures = "0.3"
 hyper = "0.14.4"
 log = "0.4"
diff --git a/ballista/executor/src/flight_service.rs b/ballista/executor/src/flight_service.rs
index c2310b32..4a62bc5f 100644
--- a/ballista/executor/src/flight_service.rs
+++ b/ballista/executor/src/flight_service.rs
@@ -26,14 +26,14 @@ use ballista_core::error::BallistaError;
 use ballista_core::serde::decode_protobuf;
 use ballista_core::serde::scheduler::Action as BallistaAction;
 
+use arrow::ipc::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions};
 use arrow_flight::{
     flight_service_server::FlightService, Action, ActionType, Criteria, Empty,
     FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse,
     PutResult, SchemaResult, Ticket,
 };
 use datafusion::arrow::{
-    error::ArrowError, ipc::reader::FileReader, ipc::writer::IpcWriteOptions,
-    record_batch::RecordBatch,
+    error::ArrowError, ipc::reader::FileReader, record_batch::RecordBatch,
 };
 use futures::{Stream, StreamExt};
 use log::{debug, info, warn};
@@ -142,7 +142,7 @@ impl FlightService for BallistaFlightService {
 
         let result = HandshakeResponse {
             protocol_version: 0,
-            payload: token.as_bytes().to_vec(),
+            payload: token.as_bytes().to_vec().into(),
         };
         let result = Ok(result);
         let output = futures::stream::iter(vec![result]);
@@ -182,8 +182,7 @@ impl FlightService for BallistaFlightService {
     ) -> Result<Response<Self::DoActionStream>, Status> {
         let action = request.into_inner();
 
-        let _action =
-            decode_protobuf(&action.body.to_vec()).map_err(|e| from_ballista_err(&e))?;
+        let _action = decode_protobuf(&action.body).map_err(|e| from_ballista_err(&e))?;
 
         Err(Status::unimplemented("do_action"))
     }
@@ -209,14 +208,19 @@ fn create_flight_iter(
     batch: &RecordBatch,
     options: &IpcWriteOptions,
 ) -> Box<dyn Iterator<Item = Result<FlightData, Status>>> {
-    let (flight_dictionaries, flight_batch) =
-        arrow_flight::utils::flight_data_from_arrow_batch(batch, options);
-    Box::new(
-        flight_dictionaries
-            .into_iter()
-            .chain(std::iter::once(flight_batch))
-            .map(Ok),
-    )
+    let data_gen = IpcDataGenerator::default();
+    let mut dictionary_tracker = DictionaryTracker::new(false);
+    let res = data_gen.encoded_batch(batch, &mut dictionary_tracker, options);
+    match res {
+        Ok((dicts, batch)) => {
+            let flights = dicts
+                .into_iter()
+                .chain(std::iter::once(batch))
+                .map(|x| x.into());
+            Box::new(flights.map(Ok))
+        }
+        Err(e) => Box::new(std::iter::once(Err(from_arrow_err(&e)))),
+    }
 }
 
 async fn stream_flight_data<T>(
diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml
index d3ab965f..64da40d7 100644
--- a/ballista/scheduler/Cargo.toml
+++ b/ballista/scheduler/Cargo.toml
@@ -43,7 +43,7 @@ sled = ["sled_package", "tokio-stream"]
 
 [dependencies]
 anyhow = "1"
-arrow-flight = { version = "29.0.0", features = ["flight-sql-experimental"] }
+arrow-flight = { version = "31.0.0", features = ["flight-sql-experimental"] }
 async-recursion = "1.0.0"
 async-trait = "0.1.41"
 ballista-core = { path = "../core", version = "0.10.0" }
@@ -51,8 +51,8 @@ base64 = { version = "0.13", default-features = false }
 clap = { version = "3", features = ["derive", "cargo"] }
 configure_me = "0.4.0"
 dashmap = "5.4.0"
-datafusion = "16.1.0"
-datafusion-proto = "16.1.0"
+datafusion = "17.0.0"
+datafusion-proto = "17.0.0"
 etcd-client = { version = "0.10", optional = true }
 flatbuffers = { version = "22.9.29" }
 futures = "0.3"
diff --git a/ballista/scheduler/src/flight_sql.rs b/ballista/scheduler/src/flight_sql.rs
index 46187e9a..ca755fd8 100644
--- a/ballista/scheduler/src/flight_sql.rs
+++ b/ballista/scheduler/src/flight_sql.rs
@@ -24,7 +24,7 @@ use arrow_flight::sql::{
     CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
     CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
     CommandPreparedStatementQuery, CommandPreparedStatementUpdate, CommandStatementQuery,
-    CommandStatementUpdate, ProstAnyExt, SqlInfo, TicketStatementQuery,
+    CommandStatementUpdate, SqlInfo, TicketStatementQuery,
 };
 use arrow_flight::{
     Action, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo, HandshakeRequest,
@@ -42,7 +42,7 @@ use tonic::{Request, Response, Status, Streaming};
 use crate::scheduler_server::SchedulerServer;
 use arrow_flight::flight_service_client::FlightServiceClient;
 use arrow_flight::sql::ProstMessageExt;
-use arrow_flight::utils::flight_data_from_arrow_batch;
+use arrow_flight::utils::batches_to_flight_data;
 use arrow_flight::SchemaAsIpc;
 use ballista_core::config::BallistaConfig;
 use ballista_core::serde::protobuf;
@@ -289,7 +289,7 @@ impl FlightSqlServiceImpl {
                 uri: format!("grpc+tcp://{authority}"),
             };
             let buf = fetch.as_any().encode_to_vec();
-            let ticket = Ticket { ticket: buf };
+            let ticket = Ticket { ticket: buf.into() };
             let fiep = FlightEndpoint {
                 ticket: Some(ticket),
                 location: vec![loc],
@@ -318,7 +318,7 @@ impl FlightSqlServiceImpl {
             uri: format!("grpc+tcp://{authority}"),
         };
         let buf = fetch.as_any().encode_to_vec();
-        let ticket = Ticket { ticket: buf };
+        let ticket = Ticket { ticket: buf.into() };
         let fiep = FlightEndpoint {
             ticket: Some(ticket),
             location: vec![loc],
@@ -391,11 +391,11 @@ impl FlightSqlServiceImpl {
     ) -> Response<FlightInfo> {
         let flight_desc = FlightDescriptor {
             r#type: DescriptorType::Cmd.into(),
-            cmd: vec![],
+            cmd: Vec::new().into(),
             path: vec![],
         };
         let info = FlightInfo {
-            schema: schema_bytes,
+            schema: schema_bytes.into(),
             flight_descriptor: Some(flight_desc),
             endpoint: fieps,
             total_records: num_rows,
@@ -434,20 +434,16 @@ impl FlightSqlServiceImpl {
     }
 
     async fn record_batch_to_resp(
-        rb: &RecordBatch,
+        rb: RecordBatch,
     ) -> Result<
         Response<Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send>>>,
         Status,
     > {
         type FlightResult = Result<FlightData, Status>;
         let (tx, rx): (Sender<FlightResult>, Receiver<FlightResult>) = channel(2);
-        let options = IpcWriteOptions::default();
-        let schema = SchemaAsIpc::new(rb.schema().as_ref(), &options).into();
-        tx.send(Ok(schema))
-            .await
-            .map_err(|_| Status::internal("Error sending schema".to_string()))?;
-        let (dict, flight) = flight_data_from_arrow_batch(rb, &options);
-        let flights = dict.into_iter().chain(std::iter::once(flight));
+        let schema = (*rb.schema()).clone();
+        let flights = batches_to_flight_data(schema, vec![rb])
+            .map_err(|_| Status::internal("Error encoding batches".to_string()))?;
         for flight in flights {
             tx.send(Ok(flight))
                 .await
@@ -521,7 +517,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
 
         let result = HandshakeResponse {
             protocol_version: 0,
-            payload: token.as_bytes().to_vec(),
+            payload: token.as_bytes().to_vec().into(),
         };
         let result = Ok(result);
         let output = futures::stream::iter(vec![result]);
@@ -537,7 +533,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
     async fn do_get_fallback(
         &self,
         request: Request<Ticket>,
-        message: prost_types::Any,
+        message: arrow_flight::sql::Any,
     ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
         debug!("do_get_fallback type_url: {}", message.type_url);
         let ctx = self.get_ctx(&request)?;
@@ -564,7 +560,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
                 let rb = FlightSqlServiceImpl::table_types().map_err(|_| {
                     Status::internal("Error getting table types".to_string())
                 })?;
-                let resp = Self::record_batch_to_resp(&rb).await?;
+                let resp = Self::record_batch_to_resp(rb).await?;
                 return Ok(resp);
             }
             "get_flight_info_tables" => {
@@ -572,7 +568,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
                 let rb = self
                     .tables(ctx)
                     .map_err(|_| Status::internal("Error getting tables".to_string()))?;
-                let resp = Self::record_batch_to_resp(&rb).await?;
+                let resp = Self::record_batch_to_resp(rb).await?;
                 return Ok(resp);
             }
             _ => {}
@@ -591,7 +587,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
                 })?;
         let mut flight_client = FlightServiceClient::new(connection);
         let buf = action.encode_to_vec();
-        let request = Request::new(Ticket { ticket: buf });
+        let request = Request::new(Ticket { ticket: buf.into() });
 
         let stream = flight_client
             .do_get(request)
@@ -623,7 +619,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
     ) -> Result<Response<FlightInfo>, Status> {
         debug!("get_flight_info_prepared_statement");
         let ctx = self.get_ctx(&request)?;
-        let handle = Uuid::from_slice(handle.prepared_statement_handle.as_slice())
+        let handle = Uuid::from_slice(handle.prepared_statement_handle.as_ref())
             .map_err(|e| Status::internal(format!("Error decoding handle: {e}")))?;
         let plan = self.get_plan(&handle)?;
         let resp = self.execute_plan(ctx, &plan).await?;
@@ -845,7 +841,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
     ) -> Result<i64, Status> {
         debug!("do_put_prepared_statement_update");
         let ctx = self.get_ctx(&request)?;
-        let handle = Uuid::from_slice(handle.prepared_statement_handle.as_slice())
+        let handle = Uuid::from_slice(handle.prepared_statement_handle.as_ref())
             .map_err(|e| Status::internal(format!("Error decoding handle: {e}")))?;
         let plan = self.get_plan(&handle)?;
         let _ = self.execute_plan(ctx, &plan).await?;
@@ -865,9 +861,9 @@ impl FlightSqlService for FlightSqlServiceImpl {
         let handle = self.cache_plan(plan)?;
         debug!("Prepared statement {}:\n{}", handle, query.query);
         let res = ActionCreatePreparedStatementResult {
-            prepared_statement_handle: handle.as_bytes().to_vec(),
-            dataset_schema: schema_bytes,
-            parameter_schema: vec![], // TODO: parameters
+            prepared_statement_handle: handle.as_bytes().to_vec().into(),
+            dataset_schema: schema_bytes.into(),
+            parameter_schema: Vec::new().into(), // TODO: parameters
         };
         Ok(res)
     }
@@ -878,7 +874,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
         _request: Request<Action>,
     ) {
         debug!("do_action_close_prepared_statement");
-        let handle = Uuid::from_slice(handle.prepared_statement_handle.as_slice());
+        let handle = Uuid::from_slice(handle.prepared_statement_handle.as_ref());
         let handle = if let Ok(handle) = handle {
             debug!("Closing {}", handle);
             handle
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 3fa9e0a5..0f437d07 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -34,8 +34,8 @@ snmalloc = ["snmalloc-rs"]
 
 [dependencies]
 ballista = { path = "../ballista/client", version = "0.10.0" }
-datafusion = "16.1.0"
-datafusion-proto = "16.1.0"
+datafusion = "17.0.0"
+datafusion-proto = "17.0.0"
 env_logger = "0.10"
 futures = "0.3"
 mimalloc = { version = "0.1", optional = true, default-features = false }
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index e6f65708..40b2ccb9 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -1553,6 +1553,7 @@ mod tests {
         use super::*;
         use ballista_core::serde::BallistaCodec;
         use datafusion::datasource::listing::ListingTableUrl;
+        use datafusion::execution::options::ReadOptions;
         use datafusion::physical_plan::ExecutionPlan;
         use datafusion_proto::logical_plan::AsLogicalPlan;
         use datafusion_proto::physical_plan::AsExecutionPlan;
@@ -1582,7 +1583,8 @@ mod tests {
                     .delimiter(b'|')
                     .has_header(false)
                     .file_extension(".tbl");
-                let listing_options = options.to_listing_options(1);
+                let cfg = SessionConfig::new();
+                let listing_options = options.to_listing_options(&cfg);
                 let config = ListingTableConfig::new(path.clone())
                     .with_listing_options(listing_options)
                     .with_schema(Arc::new(schema));
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index e38a58ec..18fbaad5 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -35,7 +35,7 @@ required-features = ["ballista/standalone"]
 
 [dependencies]
 ballista = { path = "../ballista/client", version = "0.10.0" }
-datafusion = "16.1.0"
+datafusion = "17.0.0"
 futures = "0.3"
 num_cpus = "1.13.0"
 prost = "0.11"
diff --git a/python/Cargo.toml b/python/Cargo.toml
index 59a9f847..9d7d81d9 100644
--- a/python/Cargo.toml
+++ b/python/Cargo.toml
@@ -36,9 +36,9 @@ default = ["mimalloc"]
 [dependencies]
 async-trait = "0.1"
 ballista = { path = "../ballista/client", version = "0.10.0" }
-datafusion = { version = "16.1.0", features = ["pyarrow"] }
-datafusion-common = "16.1.0"
-datafusion-expr = "16.1.0"
+datafusion = { version = "17.0.0", features = ["pyarrow"] }
+datafusion-common = "17.0.0"
+datafusion-expr = "17.0.0"
 futures = "0.3"
 mimalloc = { version = "*", optional = true, default-features = false }
 pyo3 = { version = "~0.17.1", features = ["extension-module", "abi3", "abi3-py37"] }