You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by av...@apache.org on 2023/01/31 17:47:05 UTC
[arrow-ballista] branch main updated: Upgrade to DataFusion 17 (#639)
This is an automated email from the ASF dual-hosted git repository.
avantgardner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 1b0be751 Upgrade to DataFusion 17 (#639)
1b0be751 is described below
commit 1b0be75127f508d81431516dccf92f74bafc98c5
Author: Brent Gardner <bg...@squarelabs.net>
AuthorDate: Tue Jan 31 10:46:59 2023 -0700
Upgrade to DataFusion 17 (#639)
* Upgrade to DF 17
* Restore original error handling functionality
---
ballista-cli/Cargo.toml | 4 +-
ballista/client/Cargo.toml | 4 +-
ballista/core/Cargo.toml | 6 +-
ballista/core/src/client.rs | 2 +-
ballista/core/src/serde/generated/ballista.rs | 105 ++++++++++++++++++++++++++
ballista/core/src/serde/mod.rs | 6 +-
ballista/executor/Cargo.toml | 8 +-
ballista/executor/src/flight_service.rs | 30 ++++----
ballista/scheduler/Cargo.toml | 6 +-
ballista/scheduler/src/flight_sql.rs | 46 ++++++-----
benchmarks/Cargo.toml | 4 +-
benchmarks/src/bin/tpch.rs | 4 +-
examples/Cargo.toml | 2 +-
python/Cargo.toml | 6 +-
14 files changed, 170 insertions(+), 63 deletions(-)
diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml
index a726b4fa..48261075 100644
--- a/ballista-cli/Cargo.toml
+++ b/ballista-cli/Cargo.toml
@@ -33,8 +33,8 @@ ballista = { path = "../ballista/client", version = "0.10.0", features = [
"standalone",
] }
clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = "16.1.0"
-datafusion-cli = "16.1.0"
+datafusion = "17.0.0"
+datafusion-cli = "17.0.0"
dirs = "4.0.0"
env_logger = "0.10"
mimalloc = { version = "0.1", default-features = false }
diff --git a/ballista/client/Cargo.toml b/ballista/client/Cargo.toml
index d262e5ba..f57a6f2b 100644
--- a/ballista/client/Cargo.toml
+++ b/ballista/client/Cargo.toml
@@ -31,8 +31,8 @@ rust-version = "1.63"
ballista-core = { path = "../core", version = "0.10.0" }
ballista-executor = { path = "../executor", version = "0.10.0", optional = true }
ballista-scheduler = { path = "../scheduler", version = "0.10.0", optional = true }
-datafusion = "16.1.0"
-datafusion-proto = "16.1.0"
+datafusion = "17.0.0"
+datafusion-proto = "17.0.0"
futures = "0.3"
log = "0.4"
parking_lot = "0.12"
diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml
index 27b28a5a..09272958 100644
--- a/ballista/core/Cargo.toml
+++ b/ballista/core/Cargo.toml
@@ -46,13 +46,13 @@ simd = ["datafusion/simd"]
[dependencies]
ahash = { version = "0.8", default-features = false }
-arrow-flight = { version = "29.0.0", features = ["flight-sql-experimental"] }
+arrow-flight = { version = "31.0.0", features = ["flight-sql-experimental"] }
async-trait = "0.1.41"
chrono = { version = "0.4", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = "16.1.0"
+datafusion = "17.0.0"
datafusion-objectstore-hdfs = { version = "0.1.1", default-features = false, optional = true }
-datafusion-proto = "16.1.0"
+datafusion-proto = "17.0.0"
futures = "0.3"
hashbrown = "0.13"
diff --git a/ballista/core/src/client.rs b/ballista/core/src/client.rs
index a9e616e8..d91c6fac 100644
--- a/ballista/core/src/client.rs
+++ b/ballista/core/src/client.rs
@@ -116,7 +116,7 @@ impl BallistaClient {
.encode(&mut buf)
.map_err(|e| BallistaError::GrpcActionError(format!("{e:?}")))?;
- let request = tonic::Request::new(Ticket { ticket: buf });
+ let request = tonic::Request::new(Ticket { ticket: buf.into() });
let mut stream = self
.flight_client
diff --git a/ballista/core/src/serde/generated/ballista.rs b/ballista/core/src/serde/generated/ballista.rs
index 3fc305ee..0c4de2e8 100644
--- a/ballista/core/src/serde/generated/ballista.rs
+++ b/ballista/core/src/serde/generated/ballista.rs
@@ -1,6 +1,7 @@
/// /////////////////////////////////////////////////////////////////////////////////////////////////
/// Ballista Physical Plan
/// /////////////////////////////////////////////////////////////////////////////////////////////////
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct BallistaPhysicalPlanNode {
#[prost(oneof = "ballista_physical_plan_node::PhysicalPlanType", tags = "1, 2, 3")]
@@ -10,6 +11,7 @@ pub struct BallistaPhysicalPlanNode {
}
/// Nested message and enum types in `BallistaPhysicalPlanNode`.
pub mod ballista_physical_plan_node {
+ #[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum PhysicalPlanType {
#[prost(message, tag = "1")]
@@ -20,6 +22,7 @@ pub mod ballista_physical_plan_node {
UnresolvedShuffle(super::UnresolvedShuffleExecNode),
}
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ShuffleWriterExecNode {
/// TODO it seems redundant to provide job and stage id here since we also have them
@@ -35,6 +38,7 @@ pub struct ShuffleWriterExecNode {
::datafusion_proto::protobuf::PhysicalHashRepartition,
>,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct UnresolvedShuffleExecNode {
#[prost(uint32, tag = "1")]
@@ -46,6 +50,7 @@ pub struct UnresolvedShuffleExecNode {
#[prost(uint32, tag = "4")]
pub output_partition_count: u32,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ShuffleReaderExecNode {
#[prost(message, repeated, tag = "1")]
@@ -53,6 +58,7 @@ pub struct ShuffleReaderExecNode {
#[prost(message, optional, tag = "2")]
pub schema: ::core::option::Option<::datafusion_proto::protobuf::Schema>,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ShuffleReaderPartition {
/// each partition of a shuffle read can read data from multiple locations
@@ -62,6 +68,7 @@ pub struct ShuffleReaderPartition {
/// /////////////////////////////////////////////////////////////////////////////////////////////////
/// Ballista Scheduling
/// /////////////////////////////////////////////////////////////////////////////////////////////////
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutionGraph {
#[prost(string, tag = "1")]
@@ -91,6 +98,7 @@ pub struct ExecutionGraph {
#[prost(uint64, tag = "13")]
pub queued_at: u64,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct StageAttempts {
#[prost(uint32, tag = "1")]
@@ -98,6 +106,7 @@ pub struct StageAttempts {
#[prost(uint32, repeated, tag = "2")]
pub stage_attempt_num: ::prost::alloc::vec::Vec<u32>,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutionGraphStage {
#[prost(oneof = "execution_graph_stage::StageType", tags = "1, 2, 3, 4")]
@@ -105,6 +114,7 @@ pub struct ExecutionGraphStage {
}
/// Nested message and enum types in `ExecutionGraphStage`.
pub mod execution_graph_stage {
+ #[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum StageType {
#[prost(message, tag = "1")]
@@ -117,6 +127,7 @@ pub mod execution_graph_stage {
FailedStage(super::FailedStage),
}
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct UnResolvedStage {
#[prost(uint32, tag = "1")]
@@ -138,6 +149,7 @@ pub struct UnResolvedStage {
::prost::alloc::string::String,
>,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ResolvedStage {
#[prost(uint32, tag = "1")]
@@ -161,6 +173,7 @@ pub struct ResolvedStage {
::prost::alloc::string::String,
>,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SuccessfulStage {
#[prost(uint32, tag = "1")]
@@ -184,6 +197,7 @@ pub struct SuccessfulStage {
#[prost(uint32, tag = "9")]
pub stage_attempt_num: u32,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FailedStage {
#[prost(uint32, tag = "1")]
@@ -207,6 +221,7 @@ pub struct FailedStage {
#[prost(uint32, tag = "9")]
pub stage_attempt_num: u32,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TaskInfo {
#[prost(uint32, tag = "1")]
@@ -233,6 +248,7 @@ pub struct TaskInfo {
}
/// Nested message and enum types in `TaskInfo`.
pub mod task_info {
+ #[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Status {
#[prost(message, tag = "8")]
@@ -243,6 +259,7 @@ pub mod task_info {
Successful(super::SuccessfulTask),
}
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GraphStageInput {
#[prost(uint32, tag = "1")]
@@ -252,6 +269,7 @@ pub struct GraphStageInput {
#[prost(bool, tag = "3")]
pub complete: bool,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TaskInputPartitions {
#[prost(uint32, tag = "1")]
@@ -259,6 +277,7 @@ pub struct TaskInputPartitions {
#[prost(message, repeated, tag = "2")]
pub partition_location: ::prost::alloc::vec::Vec<PartitionLocation>,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct KeyValuePair {
#[prost(string, tag = "1")]
@@ -266,6 +285,7 @@ pub struct KeyValuePair {
#[prost(string, tag = "2")]
pub value: ::prost::alloc::string::String,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Action {
/// configuration settings
@@ -276,6 +296,7 @@ pub struct Action {
}
/// Nested message and enum types in `Action`.
pub mod action {
+ #[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum ActionType {
/// Fetch a partition from an executor
@@ -283,6 +304,7 @@ pub mod action {
FetchPartition(super::FetchPartition),
}
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutePartition {
#[prost(string, tag = "1")]
@@ -302,6 +324,7 @@ pub struct ExecutePartition {
::datafusion_proto::protobuf::PhysicalHashRepartition,
>,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FetchPartition {
#[prost(string, tag = "1")]
@@ -317,6 +340,7 @@ pub struct FetchPartition {
#[prost(uint32, tag = "6")]
pub port: u32,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PartitionLocation {
/// partition_id of the map stage who produces the shuffle.
@@ -333,6 +357,7 @@ pub struct PartitionLocation {
pub path: ::prost::alloc::string::String,
}
/// Unique identifier for a materialized partition of data
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PartitionId {
#[prost(string, tag = "1")]
@@ -342,6 +367,7 @@ pub struct PartitionId {
#[prost(uint32, tag = "4")]
pub partition_id: u32,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TaskId {
#[prost(uint32, tag = "1")]
@@ -351,6 +377,7 @@ pub struct TaskId {
#[prost(uint32, tag = "3")]
pub partition_id: u32,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PartitionStats {
#[prost(int64, tag = "1")]
@@ -362,6 +389,7 @@ pub struct PartitionStats {
#[prost(message, repeated, tag = "4")]
pub column_stats: ::prost::alloc::vec::Vec<ColumnStats>,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ColumnStats {
#[prost(message, optional, tag = "1")]
@@ -373,11 +401,13 @@ pub struct ColumnStats {
#[prost(uint32, tag = "4")]
pub distinct_count: u32,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct OperatorMetricsSet {
#[prost(message, repeated, tag = "1")]
pub metrics: ::prost::alloc::vec::Vec<OperatorMetric>,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct NamedCount {
#[prost(string, tag = "1")]
@@ -385,6 +415,7 @@ pub struct NamedCount {
#[prost(uint64, tag = "2")]
pub value: u64,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct NamedGauge {
#[prost(string, tag = "1")]
@@ -392,6 +423,7 @@ pub struct NamedGauge {
#[prost(uint64, tag = "2")]
pub value: u64,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct NamedTime {
#[prost(string, tag = "1")]
@@ -399,6 +431,7 @@ pub struct NamedTime {
#[prost(uint64, tag = "2")]
pub value: u64,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct OperatorMetric {
#[prost(oneof = "operator_metric::Metric", tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10")]
@@ -406,6 +439,7 @@ pub struct OperatorMetric {
}
/// Nested message and enum types in `OperatorMetric`.
pub mod operator_metric {
+ #[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Metric {
#[prost(uint64, tag = "1")]
@@ -431,6 +465,7 @@ pub mod operator_metric {
}
}
/// Used by scheduler
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutorMetadata {
#[prost(string, tag = "1")]
@@ -445,6 +480,7 @@ pub struct ExecutorMetadata {
pub specification: ::core::option::Option<ExecutorSpecification>,
}
/// Used by grpc
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutorRegistration {
#[prost(string, tag = "1")]
@@ -464,12 +500,14 @@ pub struct ExecutorRegistration {
pub mod executor_registration {
/// "optional" keyword is stable in protoc 3.15 but prost is still on 3.14 (see <https://github.com/tokio-rs/prost/issues/430> and <https://github.com/tokio-rs/prost/pull/455>)
/// this syntax is ugly but is binary compatible with the "optional" keyword (see <https://stackoverflow.com/questions/42622015/how-to-define-an-optional-field-in-protobuf-3>)
+ #[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum OptionalHost {
#[prost(string, tag = "2")]
Host(::prost::alloc::string::String),
}
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutorHeartbeat {
#[prost(string, tag = "1")]
@@ -482,6 +520,7 @@ pub struct ExecutorHeartbeat {
#[prost(message, optional, tag = "4")]
pub status: ::core::option::Option<ExecutorStatus>,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutorMetric {
/// TODO add more metrics
@@ -491,12 +530,14 @@ pub struct ExecutorMetric {
/// Nested message and enum types in `ExecutorMetric`.
pub mod executor_metric {
/// TODO add more metrics
+ #[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Metric {
#[prost(uint64, tag = "1")]
AvailableMemory(u64),
}
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutorStatus {
#[prost(oneof = "executor_status::Status", tags = "1, 2, 3")]
@@ -504,6 +545,7 @@ pub struct ExecutorStatus {
}
/// Nested message and enum types in `ExecutorStatus`.
pub mod executor_status {
+ #[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Status {
#[prost(string, tag = "1")]
@@ -514,11 +556,13 @@ pub mod executor_status {
Unknown(::prost::alloc::string::String),
}
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutorSpecification {
#[prost(message, repeated, tag = "1")]
pub resources: ::prost::alloc::vec::Vec<ExecutorResource>,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutorResource {
/// TODO add more resources
@@ -528,12 +572,14 @@ pub struct ExecutorResource {
/// Nested message and enum types in `ExecutorResource`.
pub mod executor_resource {
/// TODO add more resources
+ #[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Resource {
#[prost(uint32, tag = "1")]
TaskSlots(u32),
}
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutorData {
#[prost(string, tag = "1")]
@@ -541,6 +587,7 @@ pub struct ExecutorData {
#[prost(message, repeated, tag = "2")]
pub resources: ::prost::alloc::vec::Vec<ExecutorResourcePair>,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutorResourcePair {
#[prost(message, optional, tag = "1")]
@@ -548,11 +595,13 @@ pub struct ExecutorResourcePair {
#[prost(message, optional, tag = "2")]
pub available: ::core::option::Option<ExecutorResource>,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct RunningTask {
#[prost(string, tag = "1")]
pub executor_id: ::prost::alloc::string::String,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FailedTask {
#[prost(string, tag = "1")]
@@ -567,6 +616,7 @@ pub struct FailedTask {
}
/// Nested message and enum types in `FailedTask`.
pub mod failed_task {
+ #[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum FailedReason {
#[prost(message, tag = "4")]
@@ -584,6 +634,7 @@ pub mod failed_task {
TaskKilled(super::TaskKilled),
}
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SuccessfulTask {
#[prost(string, tag = "1")]
@@ -593,8 +644,10 @@ pub struct SuccessfulTask {
#[prost(message, repeated, tag = "2")]
pub partitions: ::prost::alloc::vec::Vec<ShuffleWritePartition>,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutionError {}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FetchPartitionError {
#[prost(string, tag = "1")]
@@ -604,14 +657,19 @@ pub struct FetchPartitionError {
#[prost(uint32, tag = "3")]
pub map_partition_id: u32,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct IoError {}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutorLost {}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ResultLost {}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TaskKilled {}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ShuffleWritePartition {
#[prost(uint64, tag = "1")]
@@ -625,6 +683,7 @@ pub struct ShuffleWritePartition {
#[prost(uint64, tag = "5")]
pub num_bytes: u64,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TaskStatus {
#[prost(uint32, tag = "1")]
@@ -650,6 +709,7 @@ pub struct TaskStatus {
}
/// Nested message and enum types in `TaskStatus`.
pub mod task_status {
+ #[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Status {
#[prost(message, tag = "9")]
@@ -660,6 +720,7 @@ pub mod task_status {
Successful(super::SuccessfulTask),
}
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PollWorkParams {
#[prost(message, optional, tag = "1")]
@@ -670,6 +731,7 @@ pub struct PollWorkParams {
#[prost(message, repeated, tag = "3")]
pub task_status: ::prost::alloc::vec::Vec<TaskStatus>,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TaskDefinition {
#[prost(uint32, tag = "1")]
@@ -699,6 +761,7 @@ pub struct TaskDefinition {
pub props: ::prost::alloc::vec::Vec<KeyValuePair>,
}
/// A set of tasks in the same stage
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct MultiTaskDefinition {
#[prost(message, repeated, tag = "1")]
@@ -723,11 +786,13 @@ pub struct MultiTaskDefinition {
#[prost(message, repeated, tag = "9")]
pub props: ::prost::alloc::vec::Vec<KeyValuePair>,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SessionSettings {
#[prost(message, repeated, tag = "1")]
pub configs: ::prost::alloc::vec::Vec<KeyValuePair>,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct JobSessionConfig {
#[prost(string, tag = "1")]
@@ -735,21 +800,25 @@ pub struct JobSessionConfig {
#[prost(message, repeated, tag = "2")]
pub configs: ::prost::alloc::vec::Vec<KeyValuePair>,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PollWorkResult {
#[prost(message, repeated, tag = "1")]
pub tasks: ::prost::alloc::vec::Vec<TaskDefinition>,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct RegisterExecutorParams {
#[prost(message, optional, tag = "1")]
pub metadata: ::core::option::Option<ExecutorRegistration>,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct RegisterExecutorResult {
#[prost(bool, tag = "1")]
pub success: bool,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct HeartBeatParams {
#[prost(string, tag = "1")]
@@ -759,12 +828,14 @@ pub struct HeartBeatParams {
#[prost(message, optional, tag = "3")]
pub status: ::core::option::Option<ExecutorStatus>,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct HeartBeatResult {
/// TODO it's from Spark for BlockManager
#[prost(bool, tag = "1")]
pub reregister: bool,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct StopExecutorParams {
#[prost(string, tag = "1")]
@@ -776,8 +847,10 @@ pub struct StopExecutorParams {
#[prost(bool, tag = "3")]
pub force: bool,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct StopExecutorResult {}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutorStoppedParams {
#[prost(string, tag = "1")]
@@ -786,8 +859,10 @@ pub struct ExecutorStoppedParams {
#[prost(string, tag = "2")]
pub reason: ::prost::alloc::string::String,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutorStoppedResult {}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct UpdateTaskStatusParams {
#[prost(string, tag = "1")]
@@ -796,11 +871,13 @@ pub struct UpdateTaskStatusParams {
#[prost(message, repeated, tag = "2")]
pub task_status: ::prost::alloc::vec::Vec<TaskStatus>,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct UpdateTaskStatusResult {
#[prost(bool, tag = "1")]
pub success: bool,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecuteQueryParams {
#[prost(message, repeated, tag = "4")]
@@ -814,6 +891,7 @@ pub struct ExecuteQueryParams {
}
/// Nested message and enum types in `ExecuteQueryParams`.
pub mod execute_query_params {
+ #[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Query {
#[prost(bytes, tag = "1")]
@@ -821,17 +899,20 @@ pub mod execute_query_params {
#[prost(string, tag = "2")]
Sql(::prost::alloc::string::String),
}
+ #[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum OptionalSessionId {
#[prost(string, tag = "3")]
SessionId(::prost::alloc::string::String),
}
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecuteSqlParams {
#[prost(string, tag = "1")]
pub sql: ::prost::alloc::string::String,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecuteQueryResult {
#[prost(string, tag = "1")]
@@ -839,26 +920,32 @@ pub struct ExecuteQueryResult {
#[prost(string, tag = "2")]
pub session_id: ::prost::alloc::string::String,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetJobStatusParams {
#[prost(string, tag = "1")]
pub job_id: ::prost::alloc::string::String,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SuccessfulJob {
#[prost(message, repeated, tag = "1")]
pub partition_location: ::prost::alloc::vec::Vec<PartitionLocation>,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct QueuedJob {}
/// TODO: add progress report
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct RunningJob {}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FailedJob {
#[prost(string, tag = "1")]
pub error: ::prost::alloc::string::String,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct JobStatus {
#[prost(oneof = "job_status::Status", tags = "1, 2, 3, 4")]
@@ -866,6 +953,7 @@ pub struct JobStatus {
}
/// Nested message and enum types in `JobStatus`.
pub mod job_status {
+ #[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Status {
#[prost(message, tag = "1")]
@@ -878,11 +966,13 @@ pub mod job_status {
Successful(super::SuccessfulJob),
}
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetJobStatusResult {
#[prost(message, optional, tag = "1")]
pub status: ::core::option::Option<JobStatus>,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetFileMetadataParams {
#[prost(string, tag = "1")]
@@ -890,33 +980,40 @@ pub struct GetFileMetadataParams {
#[prost(string, tag = "2")]
pub file_type: ::prost::alloc::string::String,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetFileMetadataResult {
#[prost(message, optional, tag = "1")]
pub schema: ::core::option::Option<::datafusion_proto::protobuf::Schema>,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FilePartitionMetadata {
#[prost(string, repeated, tag = "1")]
pub filename: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CancelJobParams {
#[prost(string, tag = "1")]
pub job_id: ::prost::alloc::string::String,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CancelJobResult {
#[prost(bool, tag = "1")]
pub cancelled: bool,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CleanJobDataParams {
#[prost(string, tag = "1")]
pub job_id: ::prost::alloc::string::String,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CleanJobDataResult {}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct LaunchTaskParams {
/// Allow to launch a task set to an executor at once
@@ -925,6 +1022,7 @@ pub struct LaunchTaskParams {
#[prost(string, tag = "2")]
pub scheduler_id: ::prost::alloc::string::String,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct LaunchMultiTaskParams {
/// Allow to launch a task set to an executor at once
@@ -933,35 +1031,42 @@ pub struct LaunchMultiTaskParams {
#[prost(string, tag = "2")]
pub scheduler_id: ::prost::alloc::string::String,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct LaunchTaskResult {
/// TODO when part of the task set are scheduled successfully
#[prost(bool, tag = "1")]
pub success: bool,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct LaunchMultiTaskResult {
/// TODO when part of the task set are scheduled successfully
#[prost(bool, tag = "1")]
pub success: bool,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CancelTasksParams {
#[prost(message, repeated, tag = "1")]
pub task_infos: ::prost::alloc::vec::Vec<RunningTaskInfo>,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CancelTasksResult {
#[prost(bool, tag = "1")]
pub cancelled: bool,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct RemoveJobDataParams {
#[prost(string, tag = "1")]
pub job_id: ::prost::alloc::string::String,
}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct RemoveJobDataResult {}
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct RunningTaskInfo {
#[prost(uint32, tag = "1")]
diff --git a/ballista/core/src/serde/mod.rs b/ballista/core/src/serde/mod.rs
index 96fdbfc6..1b277018 100644
--- a/ballista/core/src/serde/mod.rs
+++ b/ballista/core/src/serde/mod.rs
@@ -52,10 +52,10 @@ impl ProstMessageExt for protobuf::Action {
"type.googleapis.com/arrow.flight.protocol.sql.Action"
}
- fn as_any(&self) -> prost_types::Any {
- prost_types::Any {
+ fn as_any(&self) -> arrow_flight::sql::Any {
+ arrow_flight::sql::Any {
type_url: protobuf::Action::type_url().to_string(),
- value: self.encode_to_vec(),
+ value: self.encode_to_vec().into(),
}
}
}
diff --git a/ballista/executor/Cargo.toml b/ballista/executor/Cargo.toml
index 24fe4a3a..7849d1e5 100644
--- a/ballista/executor/Cargo.toml
+++ b/ballista/executor/Cargo.toml
@@ -38,15 +38,15 @@ default = ["mimalloc"]
[dependencies]
anyhow = "1"
-arrow = { version = "29.0.0" }
-arrow-flight = { version = "29.0.0" }
+arrow = { version = "31.0.0" }
+arrow-flight = { version = "31.0.0" }
async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.10.0" }
chrono = { version = "0.4", default-features = false }
configure_me = "0.4.0"
dashmap = "5.4.0"
-datafusion = "16.1.0"
-datafusion-proto = "16.1.0"
+datafusion = "17.0.0"
+datafusion-proto = "17.0.0"
futures = "0.3"
hyper = "0.14.4"
log = "0.4"
diff --git a/ballista/executor/src/flight_service.rs b/ballista/executor/src/flight_service.rs
index c2310b32..4a62bc5f 100644
--- a/ballista/executor/src/flight_service.rs
+++ b/ballista/executor/src/flight_service.rs
@@ -26,14 +26,14 @@ use ballista_core::error::BallistaError;
use ballista_core::serde::decode_protobuf;
use ballista_core::serde::scheduler::Action as BallistaAction;
+use arrow::ipc::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions};
use arrow_flight::{
flight_service_server::FlightService, Action, ActionType, Criteria, Empty,
FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse,
PutResult, SchemaResult, Ticket,
};
use datafusion::arrow::{
- error::ArrowError, ipc::reader::FileReader, ipc::writer::IpcWriteOptions,
- record_batch::RecordBatch,
+ error::ArrowError, ipc::reader::FileReader, record_batch::RecordBatch,
};
use futures::{Stream, StreamExt};
use log::{debug, info, warn};
@@ -142,7 +142,7 @@ impl FlightService for BallistaFlightService {
let result = HandshakeResponse {
protocol_version: 0,
- payload: token.as_bytes().to_vec(),
+ payload: token.as_bytes().to_vec().into(),
};
let result = Ok(result);
let output = futures::stream::iter(vec![result]);
@@ -182,8 +182,7 @@ impl FlightService for BallistaFlightService {
) -> Result<Response<Self::DoActionStream>, Status> {
let action = request.into_inner();
- let _action =
- decode_protobuf(&action.body.to_vec()).map_err(|e| from_ballista_err(&e))?;
+ let _action = decode_protobuf(&action.body).map_err(|e| from_ballista_err(&e))?;
Err(Status::unimplemented("do_action"))
}
@@ -209,14 +208,19 @@ fn create_flight_iter(
batch: &RecordBatch,
options: &IpcWriteOptions,
) -> Box<dyn Iterator<Item = Result<FlightData, Status>>> {
- let (flight_dictionaries, flight_batch) =
- arrow_flight::utils::flight_data_from_arrow_batch(batch, options);
- Box::new(
- flight_dictionaries
- .into_iter()
- .chain(std::iter::once(flight_batch))
- .map(Ok),
- )
+ let data_gen = IpcDataGenerator::default();
+ let mut dictionary_tracker = DictionaryTracker::new(false);
+ let res = data_gen.encoded_batch(batch, &mut dictionary_tracker, options);
+ match res {
+ Ok((dicts, batch)) => {
+ let flights = dicts
+ .into_iter()
+ .chain(std::iter::once(batch))
+ .map(|x| x.into());
+ Box::new(flights.map(Ok))
+ }
+ Err(e) => Box::new(std::iter::once(Err(from_arrow_err(&e)))),
+ }
}
async fn stream_flight_data<T>(
diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml
index d3ab965f..64da40d7 100644
--- a/ballista/scheduler/Cargo.toml
+++ b/ballista/scheduler/Cargo.toml
@@ -43,7 +43,7 @@ sled = ["sled_package", "tokio-stream"]
[dependencies]
anyhow = "1"
-arrow-flight = { version = "29.0.0", features = ["flight-sql-experimental"] }
+arrow-flight = { version = "31.0.0", features = ["flight-sql-experimental"] }
async-recursion = "1.0.0"
async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.10.0" }
@@ -51,8 +51,8 @@ base64 = { version = "0.13", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
configure_me = "0.4.0"
dashmap = "5.4.0"
-datafusion = "16.1.0"
-datafusion-proto = "16.1.0"
+datafusion = "17.0.0"
+datafusion-proto = "17.0.0"
etcd-client = { version = "0.10", optional = true }
flatbuffers = { version = "22.9.29" }
futures = "0.3"
diff --git a/ballista/scheduler/src/flight_sql.rs b/ballista/scheduler/src/flight_sql.rs
index 46187e9a..ca755fd8 100644
--- a/ballista/scheduler/src/flight_sql.rs
+++ b/ballista/scheduler/src/flight_sql.rs
@@ -24,7 +24,7 @@ use arrow_flight::sql::{
CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
CommandPreparedStatementQuery, CommandPreparedStatementUpdate, CommandStatementQuery,
- CommandStatementUpdate, ProstAnyExt, SqlInfo, TicketStatementQuery,
+ CommandStatementUpdate, SqlInfo, TicketStatementQuery,
};
use arrow_flight::{
Action, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo, HandshakeRequest,
@@ -42,7 +42,7 @@ use tonic::{Request, Response, Status, Streaming};
use crate::scheduler_server::SchedulerServer;
use arrow_flight::flight_service_client::FlightServiceClient;
use arrow_flight::sql::ProstMessageExt;
-use arrow_flight::utils::flight_data_from_arrow_batch;
+use arrow_flight::utils::batches_to_flight_data;
use arrow_flight::SchemaAsIpc;
use ballista_core::config::BallistaConfig;
use ballista_core::serde::protobuf;
@@ -289,7 +289,7 @@ impl FlightSqlServiceImpl {
uri: format!("grpc+tcp://{authority}"),
};
let buf = fetch.as_any().encode_to_vec();
- let ticket = Ticket { ticket: buf };
+ let ticket = Ticket { ticket: buf.into() };
let fiep = FlightEndpoint {
ticket: Some(ticket),
location: vec![loc],
@@ -318,7 +318,7 @@ impl FlightSqlServiceImpl {
uri: format!("grpc+tcp://{authority}"),
};
let buf = fetch.as_any().encode_to_vec();
- let ticket = Ticket { ticket: buf };
+ let ticket = Ticket { ticket: buf.into() };
let fiep = FlightEndpoint {
ticket: Some(ticket),
location: vec![loc],
@@ -391,11 +391,11 @@ impl FlightSqlServiceImpl {
) -> Response<FlightInfo> {
let flight_desc = FlightDescriptor {
r#type: DescriptorType::Cmd.into(),
- cmd: vec![],
+ cmd: Vec::new().into(),
path: vec![],
};
let info = FlightInfo {
- schema: schema_bytes,
+ schema: schema_bytes.into(),
flight_descriptor: Some(flight_desc),
endpoint: fieps,
total_records: num_rows,
@@ -434,20 +434,16 @@ impl FlightSqlServiceImpl {
}
async fn record_batch_to_resp(
- rb: &RecordBatch,
+ rb: RecordBatch,
) -> Result<
Response<Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send>>>,
Status,
> {
type FlightResult = Result<FlightData, Status>;
let (tx, rx): (Sender<FlightResult>, Receiver<FlightResult>) = channel(2);
- let options = IpcWriteOptions::default();
- let schema = SchemaAsIpc::new(rb.schema().as_ref(), &options).into();
- tx.send(Ok(schema))
- .await
- .map_err(|_| Status::internal("Error sending schema".to_string()))?;
- let (dict, flight) = flight_data_from_arrow_batch(rb, &options);
- let flights = dict.into_iter().chain(std::iter::once(flight));
+ let schema = (*rb.schema()).clone();
+ let flights = batches_to_flight_data(schema, vec![rb])
+ .map_err(|_| Status::internal("Error encoding batches".to_string()))?;
for flight in flights {
tx.send(Ok(flight))
.await
@@ -521,7 +517,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
let result = HandshakeResponse {
protocol_version: 0,
- payload: token.as_bytes().to_vec(),
+ payload: token.as_bytes().to_vec().into(),
};
let result = Ok(result);
let output = futures::stream::iter(vec![result]);
@@ -537,7 +533,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
async fn do_get_fallback(
&self,
request: Request<Ticket>,
- message: prost_types::Any,
+ message: arrow_flight::sql::Any,
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
debug!("do_get_fallback type_url: {}", message.type_url);
let ctx = self.get_ctx(&request)?;
@@ -564,7 +560,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
let rb = FlightSqlServiceImpl::table_types().map_err(|_| {
Status::internal("Error getting table types".to_string())
})?;
- let resp = Self::record_batch_to_resp(&rb).await?;
+ let resp = Self::record_batch_to_resp(rb).await?;
return Ok(resp);
}
"get_flight_info_tables" => {
@@ -572,7 +568,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
let rb = self
.tables(ctx)
.map_err(|_| Status::internal("Error getting tables".to_string()))?;
- let resp = Self::record_batch_to_resp(&rb).await?;
+ let resp = Self::record_batch_to_resp(rb).await?;
return Ok(resp);
}
_ => {}
@@ -591,7 +587,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
})?;
let mut flight_client = FlightServiceClient::new(connection);
let buf = action.encode_to_vec();
- let request = Request::new(Ticket { ticket: buf });
+ let request = Request::new(Ticket { ticket: buf.into() });
let stream = flight_client
.do_get(request)
@@ -623,7 +619,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
) -> Result<Response<FlightInfo>, Status> {
debug!("get_flight_info_prepared_statement");
let ctx = self.get_ctx(&request)?;
- let handle = Uuid::from_slice(handle.prepared_statement_handle.as_slice())
+ let handle = Uuid::from_slice(handle.prepared_statement_handle.as_ref())
.map_err(|e| Status::internal(format!("Error decoding handle: {e}")))?;
let plan = self.get_plan(&handle)?;
let resp = self.execute_plan(ctx, &plan).await?;
@@ -845,7 +841,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
) -> Result<i64, Status> {
debug!("do_put_prepared_statement_update");
let ctx = self.get_ctx(&request)?;
- let handle = Uuid::from_slice(handle.prepared_statement_handle.as_slice())
+ let handle = Uuid::from_slice(handle.prepared_statement_handle.as_ref())
.map_err(|e| Status::internal(format!("Error decoding handle: {e}")))?;
let plan = self.get_plan(&handle)?;
let _ = self.execute_plan(ctx, &plan).await?;
@@ -865,9 +861,9 @@ impl FlightSqlService for FlightSqlServiceImpl {
let handle = self.cache_plan(plan)?;
debug!("Prepared statement {}:\n{}", handle, query.query);
let res = ActionCreatePreparedStatementResult {
- prepared_statement_handle: handle.as_bytes().to_vec(),
- dataset_schema: schema_bytes,
- parameter_schema: vec![], // TODO: parameters
+ prepared_statement_handle: handle.as_bytes().to_vec().into(),
+ dataset_schema: schema_bytes.into(),
+ parameter_schema: Vec::new().into(), // TODO: parameters
};
Ok(res)
}
@@ -878,7 +874,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
_request: Request<Action>,
) {
debug!("do_action_close_prepared_statement");
- let handle = Uuid::from_slice(handle.prepared_statement_handle.as_slice());
+ let handle = Uuid::from_slice(handle.prepared_statement_handle.as_ref());
let handle = if let Ok(handle) = handle {
debug!("Closing {}", handle);
handle
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 3fa9e0a5..0f437d07 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -34,8 +34,8 @@ snmalloc = ["snmalloc-rs"]
[dependencies]
ballista = { path = "../ballista/client", version = "0.10.0" }
-datafusion = "16.1.0"
-datafusion-proto = "16.1.0"
+datafusion = "17.0.0"
+datafusion-proto = "17.0.0"
env_logger = "0.10"
futures = "0.3"
mimalloc = { version = "0.1", optional = true, default-features = false }
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index e6f65708..40b2ccb9 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -1553,6 +1553,7 @@ mod tests {
use super::*;
use ballista_core::serde::BallistaCodec;
use datafusion::datasource::listing::ListingTableUrl;
+ use datafusion::execution::options::ReadOptions;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_proto::logical_plan::AsLogicalPlan;
use datafusion_proto::physical_plan::AsExecutionPlan;
@@ -1582,7 +1583,8 @@ mod tests {
.delimiter(b'|')
.has_header(false)
.file_extension(".tbl");
- let listing_options = options.to_listing_options(1);
+ let cfg = SessionConfig::new();
+ let listing_options = options.to_listing_options(&cfg);
let config = ListingTableConfig::new(path.clone())
.with_listing_options(listing_options)
.with_schema(Arc::new(schema));
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index e38a58ec..18fbaad5 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -35,7 +35,7 @@ required-features = ["ballista/standalone"]
[dependencies]
ballista = { path = "../ballista/client", version = "0.10.0" }
-datafusion = "16.1.0"
+datafusion = "17.0.0"
futures = "0.3"
num_cpus = "1.13.0"
prost = "0.11"
diff --git a/python/Cargo.toml b/python/Cargo.toml
index 59a9f847..9d7d81d9 100644
--- a/python/Cargo.toml
+++ b/python/Cargo.toml
@@ -36,9 +36,9 @@ default = ["mimalloc"]
[dependencies]
async-trait = "0.1"
ballista = { path = "../ballista/client", version = "0.10.0" }
-datafusion = { version = "16.1.0", features = ["pyarrow"] }
-datafusion-common = "16.1.0"
-datafusion-expr = "16.1.0"
+datafusion = { version = "17.0.0", features = ["pyarrow"] }
+datafusion-common = "17.0.0"
+datafusion-expr = "17.0.0"
futures = "0.3"
mimalloc = { version = "*", optional = true, default-features = false }
pyo3 = { version = "~0.17.1", features = ["extension-module", "abi3", "abi3-py37"] }