You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/11/18 13:43:06 UTC
[arrow-ballista] branch master updated: fix python build in CI (#528)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new f3e7e49b fix python build in CI (#528)
f3e7e49b is described below
commit f3e7e49ba6ce058e69ef4a94a2f6573cfba27f9e
Author: Andy Grove <an...@gmail.com>
AuthorDate: Fri Nov 18 06:43:01 2022 -0700
fix python build in CI (#528)
* fix python build in CI
* save progress
* use same min rust version in all crates
* fix
* use image from pyo3
* use newer image from pyo3
* do not require protoc
* wheels now generated
* rat - exclude generated file
---
.github/workflows/python_build.yml | 16 +-
ballista-cli/Cargo.toml | 2 +-
ballista/client/Cargo.toml | 2 +-
ballista/core/Cargo.toml | 3 +
ballista/core/build.rs | 35 +-
ballista/core/src/serde/generated/.gitignore | 4 -
ballista/core/src/serde/generated/ballista.rs | 2840 +++++++++++++++++++++++++
benchmarks/Cargo.toml | 2 +-
dev/release/rat_exclude_files.txt | 1 +
examples/Cargo.toml | 2 +-
10 files changed, 2872 insertions(+), 35 deletions(-)
diff --git a/.github/workflows/python_build.yml b/.github/workflows/python_build.yml
index 88e829d2..f069d719 100644
--- a/.github/workflows/python_build.yml
+++ b/.github/workflows/python_build.yml
@@ -94,16 +94,6 @@ jobs:
steps:
- uses: actions/checkout@v3
- run: rm LICENSE.txt
- - name: Install protobuf compiler
- shell: bash
- run: |
- mkdir -p $HOME/d/protoc
- cd $HOME/d/protoc
- export PROTO_ZIP="protoc-21.4-linux-x86_64.zip"
- curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v21.4/$PROTO_ZIP
- unzip $PROTO_ZIP
- export PATH=$PATH:$HOME/d/protoc/bin
- protoc --version
- name: Download LICENSE.txt
uses: actions/download-artifact@v3
with:
@@ -112,12 +102,12 @@ jobs:
- run: cat LICENSE.txt
- name: Build wheels
run: |
- export PATH=$PATH:$HOME/d/protoc/bin
export RUSTFLAGS='-C target-cpu=skylake'
+ rm ../ballista/core/proto/*
docker run --rm -v $(pwd)/..:/io \
--workdir /io/python \
- konstin2/maturin:v0.11.2 \
- build --release --manylinux 2010
+ ghcr.io/pyo3/maturin:v0.13.7 \
+ build --release --manylinux 2010 --locked
- name: Archive wheels
uses: actions/upload-artifact@v3
with:
diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml
index 1b2d2227..ea3f05e4 100644
--- a/ballista-cli/Cargo.toml
+++ b/ballista-cli/Cargo.toml
@@ -25,7 +25,7 @@ keywords = ["ballista", "cli"]
license = "Apache-2.0"
homepage = "https://github.com/apache/arrow-ballista"
repository = "https://github.com/apache/arrow-ballista"
-rust-version = "1.59"
+rust-version = "1.63"
readme = "README.md"
[dependencies]
diff --git a/ballista/client/Cargo.toml b/ballista/client/Cargo.toml
index a787dbd7..a429ca2d 100644
--- a/ballista/client/Cargo.toml
+++ b/ballista/client/Cargo.toml
@@ -25,7 +25,7 @@ repository = "https://github.com/apache/arrow-ballista"
readme = "README.md"
authors = ["Apache Arrow <de...@arrow.apache.org>"]
edition = "2021"
-rust-version = "1.59"
+rust-version = "1.63"
[dependencies]
ballista-core = { path = "../core", version = "0.10.0" }
diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml
index 8d64842a..20570a45 100644
--- a/ballista/core/Cargo.toml
+++ b/ballista/core/Cargo.toml
@@ -27,6 +27,9 @@ authors = ["Apache Arrow <de...@arrow.apache.org>"]
edition = "2018"
build = "build.rs"
+# Exclude proto files so crates.io consumers don't need protoc
+exclude = ["*.proto"]
+
[package.metadata.docs.rs]
rustc-args = ["--cfg", "docsrs"]
diff --git a/ballista/core/build.rs b/ballista/core/build.rs
index ab5d050d..0fdbdc38 100644
--- a/ballista/core/build.rs
+++ b/ballista/core/build.rs
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+use std::path::Path;
+
fn main() -> Result<(), String> {
use std::io::Write;
@@ -23,14 +25,8 @@ fn main() -> Result<(), String> {
// for use in docker build where file changes can be wonky
println!("cargo:rerun-if-env-changed=FORCE_REBUILD");
- println!("cargo:rerun-if-changed=proto/ballista.proto");
let version = rustc_version::version().unwrap();
println!("cargo:rustc-env=RUSTC_VERSION={}", version);
- println!("cargo:rerun-if-changed=proto/datafusion.proto");
- tonic_build::configure()
- .extern_path(".datafusion", "::datafusion_proto::protobuf")
- .compile(&["proto/ballista.proto"], &["proto"])
- .map_err(|e| format!("protobuf compilation failed: {}", e))?;
// TODO: undo when resolved: https://github.com/intellij-rust/intellij-rust/issues/9402
#[cfg(feature = "docsrs")]
@@ -38,14 +34,25 @@ fn main() -> Result<(), String> {
#[cfg(not(feature = "docsrs"))]
let path = "src/serde/generated/ballista.rs";
- let code = std::fs::read_to_string(out.join("ballista.protobuf.rs")).unwrap();
- let mut file = std::fs::OpenOptions::new()
- .write(true)
- .truncate(true)
- .create(true)
- .open(path)
- .unwrap();
- file.write_all(code.as_str().as_ref()).unwrap();
+ // We don't include the proto files in releases so that downstreams
+ // do not need to have PROTOC included
+ if Path::new("proto/datafusion.proto").exists() {
+ println!("cargo:rerun-if-changed=proto/datafusion.proto");
+ println!("cargo:rerun-if-changed=proto/ballista.proto");
+ tonic_build::configure()
+ .extern_path(".datafusion", "::datafusion_proto::protobuf")
+ .compile(&["proto/ballista.proto"], &["proto"])
+ .map_err(|e| format!("protobuf compilation failed: {}", e))?;
+ let generated_source_path = out.join("ballista.protobuf.rs");
+ let code = std::fs::read_to_string(generated_source_path).unwrap();
+ let mut file = std::fs::OpenOptions::new()
+ .write(true)
+ .truncate(true)
+ .create(true)
+ .open(path)
+ .unwrap();
+ file.write_all(code.as_str().as_ref()).unwrap();
+ }
Ok(())
}
diff --git a/ballista/core/src/serde/generated/.gitignore b/ballista/core/src/serde/generated/.gitignore
deleted file mode 100644
index 42eb8bcd..00000000
--- a/ballista/core/src/serde/generated/.gitignore
+++ /dev/null
@@ -1,4 +0,0 @@
-*
-
-!.gitignore
-!mod.rs
diff --git a/ballista/core/src/serde/generated/ballista.rs b/ballista/core/src/serde/generated/ballista.rs
new file mode 100644
index 00000000..7f9685e8
--- /dev/null
+++ b/ballista/core/src/serde/generated/ballista.rs
@@ -0,0 +1,2840 @@
+// /////////////////////////////////////////////////////////////////////////////////////////////////
+// Ballista Logical Plan
+// /////////////////////////////////////////////////////////////////////////////////////////////////
+
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct Statistics {
+ #[prost(int64, tag="1")]
+ pub num_rows: i64,
+ #[prost(int64, tag="2")]
+ pub total_byte_size: i64,
+ #[prost(message, repeated, tag="3")]
+ pub column_stats: ::prost::alloc::vec::Vec<ColumnStats>,
+ #[prost(bool, tag="4")]
+ pub is_exact: bool,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct FileRange {
+ #[prost(int64, tag="1")]
+ pub start: i64,
+ #[prost(int64, tag="2")]
+ pub end: i64,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PartitionedFile {
+ #[prost(string, tag="1")]
+ pub path: ::prost::alloc::string::String,
+ #[prost(uint64, tag="2")]
+ pub size: u64,
+ #[prost(uint64, tag="3")]
+ pub last_modified_ns: u64,
+ #[prost(message, repeated, tag="4")]
+ pub partition_values: ::prost::alloc::vec::Vec<::datafusion_proto::protobuf::ScalarValue>,
+ #[prost(message, optional, tag="5")]
+ pub range: ::core::option::Option<FileRange>,
+}
+// /////////////////////////////////////////////////////////////////////////////////////////////////
+// Ballista Physical Plan
+// /////////////////////////////////////////////////////////////////////////////////////////////////
+
+/// PhysicalPlanNode is a nested type
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PhysicalPlanNode {
+ #[prost(oneof="physical_plan_node::PhysicalPlanType", tags="1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24")]
+ pub physical_plan_type: ::core::option::Option<physical_plan_node::PhysicalPlanType>,
+}
+/// Nested message and enum types in `PhysicalPlanNode`.
+pub mod physical_plan_node {
+ #[derive(Clone, PartialEq, ::prost::Oneof)]
+ pub enum PhysicalPlanType {
+ #[prost(message, tag="1")]
+ ParquetScan(super::ParquetScanExecNode),
+ #[prost(message, tag="2")]
+ CsvScan(super::CsvScanExecNode),
+ #[prost(message, tag="3")]
+ Empty(super::EmptyExecNode),
+ #[prost(message, tag="4")]
+ Projection(::prost::alloc::boxed::Box<super::ProjectionExecNode>),
+ #[prost(message, tag="6")]
+ GlobalLimit(::prost::alloc::boxed::Box<super::GlobalLimitExecNode>),
+ #[prost(message, tag="7")]
+ LocalLimit(::prost::alloc::boxed::Box<super::LocalLimitExecNode>),
+ #[prost(message, tag="8")]
+ Aggregate(::prost::alloc::boxed::Box<super::AggregateExecNode>),
+ #[prost(message, tag="9")]
+ HashJoin(::prost::alloc::boxed::Box<super::HashJoinExecNode>),
+ #[prost(message, tag="10")]
+ ShuffleReader(super::ShuffleReaderExecNode),
+ #[prost(message, tag="11")]
+ Sort(::prost::alloc::boxed::Box<super::SortExecNode>),
+ #[prost(message, tag="12")]
+ CoalesceBatches(::prost::alloc::boxed::Box<super::CoalesceBatchesExecNode>),
+ #[prost(message, tag="13")]
+ Filter(::prost::alloc::boxed::Box<super::FilterExecNode>),
+ #[prost(message, tag="14")]
+ Merge(::prost::alloc::boxed::Box<super::CoalescePartitionsExecNode>),
+ #[prost(message, tag="15")]
+ Unresolved(super::UnresolvedShuffleExecNode),
+ #[prost(message, tag="16")]
+ Repartition(::prost::alloc::boxed::Box<super::RepartitionExecNode>),
+ #[prost(message, tag="17")]
+ Window(::prost::alloc::boxed::Box<super::WindowAggExecNode>),
+ #[prost(message, tag="18")]
+ ShuffleWriter(::prost::alloc::boxed::Box<super::ShuffleWriterExecNode>),
+ #[prost(message, tag="19")]
+ CrossJoin(::prost::alloc::boxed::Box<super::CrossJoinExecNode>),
+ #[prost(message, tag="20")]
+ AvroScan(super::AvroScanExecNode),
+ #[prost(message, tag="21")]
+ Extension(super::PhysicalExtensionNode),
+ #[prost(message, tag="22")]
+ Union(super::UnionExecNode),
+ #[prost(message, tag="23")]
+ Explain(super::ExplainExecNode),
+ #[prost(message, tag="24")]
+ SortPreservingMerge(::prost::alloc::boxed::Box<super::SortPreservingMergeExecNode>),
+ }
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PhysicalExtensionNode {
+ #[prost(bytes="vec", tag="1")]
+ pub node: ::prost::alloc::vec::Vec<u8>,
+ #[prost(message, repeated, tag="2")]
+ pub inputs: ::prost::alloc::vec::Vec<PhysicalPlanNode>,
+}
+/// physical expressions
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PhysicalExprNode {
+ #[prost(oneof="physical_expr_node::ExprType", tags="1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17")]
+ pub expr_type: ::core::option::Option<physical_expr_node::ExprType>,
+}
+/// Nested message and enum types in `PhysicalExprNode`.
+pub mod physical_expr_node {
+ #[derive(Clone, PartialEq, ::prost::Oneof)]
+ pub enum ExprType {
+ /// column references
+ #[prost(message, tag="1")]
+ Column(super::PhysicalColumn),
+ #[prost(message, tag="2")]
+ Literal(::datafusion_proto::protobuf::ScalarValue),
+ /// binary expressions
+ #[prost(message, tag="3")]
+ BinaryExpr(::prost::alloc::boxed::Box<super::PhysicalBinaryExprNode>),
+ /// aggregate expressions
+ #[prost(message, tag="4")]
+ AggregateExpr(super::PhysicalAggregateExprNode),
+ /// null checks
+ #[prost(message, tag="5")]
+ IsNullExpr(::prost::alloc::boxed::Box<super::PhysicalIsNull>),
+ #[prost(message, tag="6")]
+ IsNotNullExpr(::prost::alloc::boxed::Box<super::PhysicalIsNotNull>),
+ #[prost(message, tag="7")]
+ NotExpr(::prost::alloc::boxed::Box<super::PhysicalNot>),
+ #[prost(message, tag="8")]
+ Case(::prost::alloc::boxed::Box<super::PhysicalCaseNode>),
+ #[prost(message, tag="9")]
+ Cast(::prost::alloc::boxed::Box<super::PhysicalCastNode>),
+ #[prost(message, tag="10")]
+ Sort(::prost::alloc::boxed::Box<super::PhysicalSortExprNode>),
+ #[prost(message, tag="11")]
+ Negative(::prost::alloc::boxed::Box<super::PhysicalNegativeNode>),
+ #[prost(message, tag="12")]
+ InList(::prost::alloc::boxed::Box<super::PhysicalInListNode>),
+ #[prost(message, tag="13")]
+ ScalarFunction(super::PhysicalScalarFunctionNode),
+ #[prost(message, tag="14")]
+ TryCast(::prost::alloc::boxed::Box<super::PhysicalTryCastNode>),
+ /// window expressions
+ #[prost(message, tag="15")]
+ WindowExpr(::prost::alloc::boxed::Box<super::PhysicalWindowExprNode>),
+ #[prost(message, tag="16")]
+ ScalarUdf(super::PhysicalScalarUdfNode),
+ #[prost(message, tag="17")]
+ DateTimeIntervalExpr(::prost::alloc::boxed::Box<super::PhysicalDateTimeIntervalExprNode>),
+ }
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PhysicalScalarUdfNode {
+ #[prost(string, tag="1")]
+ pub name: ::prost::alloc::string::String,
+ #[prost(message, repeated, tag="2")]
+ pub args: ::prost::alloc::vec::Vec<PhysicalExprNode>,
+ #[prost(message, optional, tag="4")]
+ pub return_type: ::core::option::Option<::datafusion_proto::protobuf::ArrowType>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PhysicalAggregateExprNode {
+ #[prost(enumeration="::datafusion_proto::protobuf::AggregateFunction", tag="1")]
+ pub aggr_function: i32,
+ #[prost(message, repeated, tag="2")]
+ pub expr: ::prost::alloc::vec::Vec<PhysicalExprNode>,
+ #[prost(bool, tag="3")]
+ pub distinct: bool,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PhysicalWindowExprNode {
+ #[prost(message, optional, boxed, tag="4")]
+ pub expr: ::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
+ #[prost(oneof="physical_window_expr_node::WindowFunction", tags="1, 2")]
+ pub window_function: ::core::option::Option<physical_window_expr_node::WindowFunction>,
+}
+/// Nested message and enum types in `PhysicalWindowExprNode`.
+pub mod physical_window_expr_node {
+ #[derive(Clone, PartialEq, ::prost::Oneof)]
+ pub enum WindowFunction {
+ #[prost(enumeration="::datafusion_proto::protobuf::AggregateFunction", tag="1")]
+ AggrFunction(i32),
+ /// udaf = 3
+ #[prost(enumeration="::datafusion_proto::protobuf::BuiltInWindowFunction", tag="2")]
+ BuiltInFunction(i32),
+ }
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PhysicalIsNull {
+ #[prost(message, optional, boxed, tag="1")]
+ pub expr: ::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PhysicalIsNotNull {
+ #[prost(message, optional, boxed, tag="1")]
+ pub expr: ::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PhysicalNot {
+ #[prost(message, optional, boxed, tag="1")]
+ pub expr: ::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PhysicalAliasNode {
+ #[prost(message, optional, tag="1")]
+ pub expr: ::core::option::Option<PhysicalExprNode>,
+ #[prost(string, tag="2")]
+ pub alias: ::prost::alloc::string::String,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PhysicalBinaryExprNode {
+ #[prost(message, optional, boxed, tag="1")]
+ pub l: ::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
+ #[prost(message, optional, boxed, tag="2")]
+ pub r: ::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
+ #[prost(string, tag="3")]
+ pub op: ::prost::alloc::string::String,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PhysicalDateTimeIntervalExprNode {
+ #[prost(message, optional, boxed, tag="1")]
+ pub l: ::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
+ #[prost(message, optional, boxed, tag="2")]
+ pub r: ::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
+ #[prost(string, tag="3")]
+ pub op: ::prost::alloc::string::String,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PhysicalSortExprNode {
+ #[prost(message, optional, boxed, tag="1")]
+ pub expr: ::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
+ #[prost(bool, tag="2")]
+ pub asc: bool,
+ #[prost(bool, tag="3")]
+ pub nulls_first: bool,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PhysicalWhenThen {
+ #[prost(message, optional, tag="1")]
+ pub when_expr: ::core::option::Option<PhysicalExprNode>,
+ #[prost(message, optional, tag="2")]
+ pub then_expr: ::core::option::Option<PhysicalExprNode>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PhysicalInListNode {
+ #[prost(message, optional, boxed, tag="1")]
+ pub expr: ::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
+ #[prost(message, repeated, tag="2")]
+ pub list: ::prost::alloc::vec::Vec<PhysicalExprNode>,
+ #[prost(bool, tag="3")]
+ pub negated: bool,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PhysicalCaseNode {
+ #[prost(message, optional, boxed, tag="1")]
+ pub expr: ::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
+ #[prost(message, repeated, tag="2")]
+ pub when_then_expr: ::prost::alloc::vec::Vec<PhysicalWhenThen>,
+ #[prost(message, optional, boxed, tag="3")]
+ pub else_expr: ::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PhysicalScalarFunctionNode {
+ #[prost(string, tag="1")]
+ pub name: ::prost::alloc::string::String,
+ #[prost(enumeration="::datafusion_proto::protobuf::ScalarFunction", tag="2")]
+ pub fun: i32,
+ #[prost(message, repeated, tag="3")]
+ pub args: ::prost::alloc::vec::Vec<PhysicalExprNode>,
+ #[prost(message, optional, tag="4")]
+ pub return_type: ::core::option::Option<::datafusion_proto::protobuf::ArrowType>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PhysicalTryCastNode {
+ #[prost(message, optional, boxed, tag="1")]
+ pub expr: ::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
+ #[prost(message, optional, tag="2")]
+ pub arrow_type: ::core::option::Option<::datafusion_proto::protobuf::ArrowType>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PhysicalCastNode {
+ #[prost(message, optional, boxed, tag="1")]
+ pub expr: ::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
+ #[prost(message, optional, tag="2")]
+ pub arrow_type: ::core::option::Option<::datafusion_proto::protobuf::ArrowType>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PhysicalNegativeNode {
+ #[prost(message, optional, boxed, tag="1")]
+ pub expr: ::core::option::Option<::prost::alloc::boxed::Box<PhysicalExprNode>>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct UnresolvedShuffleExecNode {
+ #[prost(uint32, tag="1")]
+ pub stage_id: u32,
+ #[prost(message, optional, tag="2")]
+ pub schema: ::core::option::Option<::datafusion_proto::protobuf::Schema>,
+ #[prost(uint32, tag="3")]
+ pub input_partition_count: u32,
+ #[prost(uint32, tag="4")]
+ pub output_partition_count: u32,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct FilterExecNode {
+ #[prost(message, optional, boxed, tag="1")]
+ pub input: ::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
+ #[prost(message, optional, tag="2")]
+ pub expr: ::core::option::Option<PhysicalExprNode>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct FileGroup {
+ #[prost(message, repeated, tag="1")]
+ pub files: ::prost::alloc::vec::Vec<PartitionedFile>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct ScanLimit {
+ /// wrap into a message to make it optional
+ #[prost(uint32, tag="1")]
+ pub limit: u32,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct FileScanExecConf {
+ #[prost(message, repeated, tag="1")]
+ pub file_groups: ::prost::alloc::vec::Vec<FileGroup>,
+ #[prost(message, optional, tag="2")]
+ pub schema: ::core::option::Option<::datafusion_proto::protobuf::Schema>,
+ #[prost(uint32, repeated, tag="4")]
+ pub projection: ::prost::alloc::vec::Vec<u32>,
+ #[prost(message, optional, tag="5")]
+ pub limit: ::core::option::Option<ScanLimit>,
+ #[prost(message, optional, tag="6")]
+ pub statistics: ::core::option::Option<Statistics>,
+ #[prost(string, repeated, tag="7")]
+ pub table_partition_cols: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
+ #[prost(string, tag="8")]
+ pub object_store_url: ::prost::alloc::string::String,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct ParquetScanExecNode {
+ #[prost(message, optional, tag="1")]
+ pub base_conf: ::core::option::Option<FileScanExecConf>,
+ #[prost(message, optional, tag="2")]
+ pub pruning_predicate: ::core::option::Option<::datafusion_proto::protobuf::LogicalExprNode>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct CsvScanExecNode {
+ #[prost(message, optional, tag="1")]
+ pub base_conf: ::core::option::Option<FileScanExecConf>,
+ #[prost(bool, tag="2")]
+ pub has_header: bool,
+ #[prost(string, tag="3")]
+ pub delimiter: ::prost::alloc::string::String,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct AvroScanExecNode {
+ #[prost(message, optional, tag="1")]
+ pub base_conf: ::core::option::Option<FileScanExecConf>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct HashJoinExecNode {
+ #[prost(message, optional, boxed, tag="1")]
+ pub left: ::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
+ #[prost(message, optional, boxed, tag="2")]
+ pub right: ::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
+ #[prost(message, repeated, tag="3")]
+ pub on: ::prost::alloc::vec::Vec<JoinOn>,
+ #[prost(enumeration="::datafusion_proto::protobuf::JoinType", tag="4")]
+ pub join_type: i32,
+ #[prost(enumeration="PartitionMode", tag="6")]
+ pub partition_mode: i32,
+ #[prost(bool, tag="7")]
+ pub null_equals_null: bool,
+ #[prost(message, optional, tag="8")]
+ pub filter: ::core::option::Option<JoinFilter>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct UnionExecNode {
+ #[prost(message, repeated, tag="1")]
+ pub inputs: ::prost::alloc::vec::Vec<PhysicalPlanNode>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct ExplainExecNode {
+ #[prost(message, optional, tag="1")]
+ pub schema: ::core::option::Option<::datafusion_proto::protobuf::Schema>,
+ #[prost(message, repeated, tag="2")]
+ pub stringified_plans: ::prost::alloc::vec::Vec<::datafusion_proto::protobuf::StringifiedPlan>,
+ #[prost(bool, tag="3")]
+ pub verbose: bool,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct CrossJoinExecNode {
+ #[prost(message, optional, boxed, tag="1")]
+ pub left: ::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
+ #[prost(message, optional, boxed, tag="2")]
+ pub right: ::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PhysicalColumn {
+ #[prost(string, tag="1")]
+ pub name: ::prost::alloc::string::String,
+ #[prost(uint32, tag="2")]
+ pub index: u32,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct JoinOn {
+ #[prost(message, optional, tag="1")]
+ pub left: ::core::option::Option<PhysicalColumn>,
+ #[prost(message, optional, tag="2")]
+ pub right: ::core::option::Option<PhysicalColumn>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct EmptyExecNode {
+ #[prost(bool, tag="1")]
+ pub produce_one_row: bool,
+ #[prost(message, optional, tag="2")]
+ pub schema: ::core::option::Option<::datafusion_proto::protobuf::Schema>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct ProjectionExecNode {
+ #[prost(message, optional, boxed, tag="1")]
+ pub input: ::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
+ #[prost(message, repeated, tag="2")]
+ pub expr: ::prost::alloc::vec::Vec<PhysicalExprNode>,
+ #[prost(string, repeated, tag="3")]
+ pub expr_name: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct WindowAggExecNode {
+ #[prost(message, optional, boxed, tag="1")]
+ pub input: ::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
+ #[prost(message, repeated, tag="2")]
+ pub window_expr: ::prost::alloc::vec::Vec<PhysicalExprNode>,
+ #[prost(string, repeated, tag="3")]
+ pub window_expr_name: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
+ #[prost(message, optional, tag="4")]
+ pub input_schema: ::core::option::Option<::datafusion_proto::protobuf::Schema>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct AggregateExecNode {
+ #[prost(message, repeated, tag="1")]
+ pub group_expr: ::prost::alloc::vec::Vec<PhysicalExprNode>,
+ #[prost(message, repeated, tag="2")]
+ pub aggr_expr: ::prost::alloc::vec::Vec<PhysicalExprNode>,
+ #[prost(enumeration="AggregateMode", tag="3")]
+ pub mode: i32,
+ #[prost(message, optional, boxed, tag="4")]
+ pub input: ::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
+ #[prost(string, repeated, tag="5")]
+ pub group_expr_name: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
+ #[prost(string, repeated, tag="6")]
+ pub aggr_expr_name: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
+ /// we need the input schema to the partial aggregate to pass to the final aggregate
+ #[prost(message, optional, tag="7")]
+ pub input_schema: ::core::option::Option<::datafusion_proto::protobuf::Schema>,
+ #[prost(message, repeated, tag="8")]
+ pub null_expr: ::prost::alloc::vec::Vec<PhysicalExprNode>,
+ #[prost(bool, repeated, tag="9")]
+ pub groups: ::prost::alloc::vec::Vec<bool>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct ShuffleWriterExecNode {
+ /// TODO it seems redundant to provide job and stage id here since we also have them
+ /// in the TaskDefinition that wraps this plan
+ #[prost(string, tag="1")]
+ pub job_id: ::prost::alloc::string::String,
+ #[prost(uint32, tag="2")]
+ pub stage_id: u32,
+ #[prost(message, optional, boxed, tag="3")]
+ pub input: ::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
+ #[prost(message, optional, tag="4")]
+ pub output_partitioning: ::core::option::Option<PhysicalHashRepartition>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct ShuffleReaderExecNode {
+ #[prost(message, repeated, tag="1")]
+ pub partition: ::prost::alloc::vec::Vec<ShuffleReaderPartition>,
+ #[prost(message, optional, tag="2")]
+ pub schema: ::core::option::Option<::datafusion_proto::protobuf::Schema>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct ShuffleReaderPartition {
+ /// each partition of a shuffle read can read data from multiple locations
+ #[prost(message, repeated, tag="1")]
+ pub location: ::prost::alloc::vec::Vec<PartitionLocation>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct GlobalLimitExecNode {
+ #[prost(message, optional, boxed, tag="1")]
+ pub input: ::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
+ /// The number of rows to skip before fetch
+ #[prost(uint32, tag="2")]
+ pub skip: u32,
+ /// Maximum number of rows to fetch; negative means no limit
+ #[prost(int64, tag="3")]
+ pub fetch: i64,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct LocalLimitExecNode {
+ #[prost(message, optional, boxed, tag="1")]
+ pub input: ::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
+ #[prost(uint32, tag="2")]
+ pub fetch: u32,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct SortExecNode {
+ #[prost(message, optional, boxed, tag="1")]
+ pub input: ::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
+ #[prost(message, repeated, tag="2")]
+ pub expr: ::prost::alloc::vec::Vec<PhysicalExprNode>,
+ /// Maximum number of highest/lowest rows to fetch; negative means no limit
+ #[prost(int64, tag="3")]
+ pub fetch: i64,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct SortPreservingMergeExecNode {
+ #[prost(message, optional, boxed, tag="1")]
+ pub input: ::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
+ #[prost(message, repeated, tag="2")]
+ pub expr: ::prost::alloc::vec::Vec<PhysicalExprNode>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct CoalesceBatchesExecNode {
+ #[prost(message, optional, boxed, tag="1")]
+ pub input: ::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
+ #[prost(uint32, tag="2")]
+ pub target_batch_size: u32,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct CoalescePartitionsExecNode {
+ #[prost(message, optional, boxed, tag="1")]
+ pub input: ::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PhysicalHashRepartition {
+ #[prost(message, repeated, tag="1")]
+ pub hash_expr: ::prost::alloc::vec::Vec<PhysicalExprNode>,
+ #[prost(uint64, tag="2")]
+ pub partition_count: u64,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct RepartitionExecNode {
+ #[prost(message, optional, boxed, tag="1")]
+ pub input: ::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
+ #[prost(oneof="repartition_exec_node::PartitionMethod", tags="2, 3, 4")]
+ pub partition_method: ::core::option::Option<repartition_exec_node::PartitionMethod>,
+}
+/// Nested message and enum types in `RepartitionExecNode`.
+pub mod repartition_exec_node {
+ #[derive(Clone, PartialEq, ::prost::Oneof)]
+ pub enum PartitionMethod {
+ #[prost(uint64, tag="2")]
+ RoundRobin(u64),
+ #[prost(message, tag="3")]
+ Hash(super::PhysicalHashRepartition),
+ #[prost(uint64, tag="4")]
+ Unknown(u64),
+ }
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct JoinFilter {
+ #[prost(message, optional, tag="1")]
+ pub expression: ::core::option::Option<PhysicalExprNode>,
+ #[prost(message, repeated, tag="2")]
+ pub column_indices: ::prost::alloc::vec::Vec<ColumnIndex>,
+ #[prost(message, optional, tag="3")]
+ pub schema: ::core::option::Option<::datafusion_proto::protobuf::Schema>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct ColumnIndex {
+ #[prost(uint32, tag="1")]
+ pub index: u32,
+ #[prost(enumeration="JoinSide", tag="2")]
+ pub side: i32,
+}
+/// /////////////////////////////////////////////////////////////////////////////////////////////////
+/// Ballista Scheduling
+/// /////////////////////////////////////////////////////////////////////////////////////////////////
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct ExecutionGraph {
+ #[prost(string, tag="1")]
+ pub job_id: ::prost::alloc::string::String,
+ #[prost(string, tag="2")]
+ pub session_id: ::prost::alloc::string::String,
+ #[prost(message, optional, tag="3")]
+ pub status: ::core::option::Option<JobStatus>,
+ #[prost(message, repeated, tag="4")]
+ pub stages: ::prost::alloc::vec::Vec<ExecutionGraphStage>,
+ #[prost(uint64, tag="5")]
+ pub output_partitions: u64,
+ #[prost(message, repeated, tag="6")]
+ pub output_locations: ::prost::alloc::vec::Vec<PartitionLocation>,
+ #[prost(string, tag="7")]
+ pub scheduler_id: ::prost::alloc::string::String,
+ #[prost(uint32, tag="8")]
+ pub task_id_gen: u32,
+ #[prost(message, repeated, tag="9")]
+ pub failed_attempts: ::prost::alloc::vec::Vec<StageAttempts>,
+ #[prost(string, tag="10")]
+ pub job_name: ::prost::alloc::string::String,
+ #[prost(uint64, tag="11")]
+ pub start_time: u64,
+ #[prost(uint64, tag="12")]
+ pub end_time: u64,
+ #[prost(uint64, tag="13")]
+ pub queued_at: u64,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct StageAttempts {
+ #[prost(uint32, tag="1")]
+ pub stage_id: u32,
+ #[prost(uint32, repeated, tag="2")]
+ pub stage_attempt_num: ::prost::alloc::vec::Vec<u32>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct ExecutionGraphStage {
+ #[prost(oneof="execution_graph_stage::StageType", tags="1, 2, 3, 4")]
+ pub stage_type: ::core::option::Option<execution_graph_stage::StageType>,
+}
+/// Nested message and enum types in `ExecutionGraphStage`.
+pub mod execution_graph_stage {
+ #[derive(Clone, PartialEq, ::prost::Oneof)]
+ pub enum StageType {
+ #[prost(message, tag="1")]
+ UnresolvedStage(super::UnResolvedStage),
+ #[prost(message, tag="2")]
+ ResolvedStage(super::ResolvedStage),
+ #[prost(message, tag="3")]
+ SuccessfulStage(super::SuccessfulStage),
+ #[prost(message, tag="4")]
+ FailedStage(super::FailedStage),
+ }
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct UnResolvedStage {
+ #[prost(uint32, tag="1")]
+ pub stage_id: u32,
+ #[prost(message, optional, tag="2")]
+ pub output_partitioning: ::core::option::Option<PhysicalHashRepartition>,
+ #[prost(uint32, repeated, tag="3")]
+ pub output_links: ::prost::alloc::vec::Vec<u32>,
+ #[prost(message, repeated, tag="4")]
+ pub inputs: ::prost::alloc::vec::Vec<GraphStageInput>,
+ #[prost(bytes="vec", tag="5")]
+ pub plan: ::prost::alloc::vec::Vec<u8>,
+ #[prost(uint32, tag="6")]
+ pub stage_attempt_num: u32,
+ #[prost(string, repeated, tag="7")]
+ pub last_attempt_failure_reasons: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct ResolvedStage {
+ #[prost(uint32, tag="1")]
+ pub stage_id: u32,
+ #[prost(uint32, tag="2")]
+ pub partitions: u32,
+ #[prost(message, optional, tag="3")]
+ pub output_partitioning: ::core::option::Option<PhysicalHashRepartition>,
+ #[prost(uint32, repeated, tag="4")]
+ pub output_links: ::prost::alloc::vec::Vec<u32>,
+ #[prost(message, repeated, tag="5")]
+ pub inputs: ::prost::alloc::vec::Vec<GraphStageInput>,
+ #[prost(bytes="vec", tag="6")]
+ pub plan: ::prost::alloc::vec::Vec<u8>,
+ #[prost(uint32, tag="7")]
+ pub stage_attempt_num: u32,
+ #[prost(string, repeated, tag="8")]
+ pub last_attempt_failure_reasons: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct SuccessfulStage {
+ #[prost(uint32, tag="1")]
+ pub stage_id: u32,
+ #[prost(uint32, tag="2")]
+ pub partitions: u32,
+ #[prost(message, optional, tag="3")]
+ pub output_partitioning: ::core::option::Option<PhysicalHashRepartition>,
+ #[prost(uint32, repeated, tag="4")]
+ pub output_links: ::prost::alloc::vec::Vec<u32>,
+ #[prost(message, repeated, tag="5")]
+ pub inputs: ::prost::alloc::vec::Vec<GraphStageInput>,
+ #[prost(bytes="vec", tag="6")]
+ pub plan: ::prost::alloc::vec::Vec<u8>,
+ #[prost(message, repeated, tag="7")]
+ pub task_infos: ::prost::alloc::vec::Vec<TaskInfo>,
+ #[prost(message, repeated, tag="8")]
+ pub stage_metrics: ::prost::alloc::vec::Vec<OperatorMetricsSet>,
+ #[prost(uint32, tag="9")]
+ pub stage_attempt_num: u32,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct FailedStage {
+ #[prost(uint32, tag="1")]
+ pub stage_id: u32,
+ #[prost(uint32, tag="2")]
+ pub partitions: u32,
+ #[prost(message, optional, tag="3")]
+ pub output_partitioning: ::core::option::Option<PhysicalHashRepartition>,
+ #[prost(uint32, repeated, tag="4")]
+ pub output_links: ::prost::alloc::vec::Vec<u32>,
+ #[prost(bytes="vec", tag="5")]
+ pub plan: ::prost::alloc::vec::Vec<u8>,
+ #[prost(message, repeated, tag="6")]
+ pub task_infos: ::prost::alloc::vec::Vec<TaskInfo>,
+ #[prost(message, repeated, tag="7")]
+ pub stage_metrics: ::prost::alloc::vec::Vec<OperatorMetricsSet>,
+ #[prost(string, tag="8")]
+ pub error_message: ::prost::alloc::string::String,
+ #[prost(uint32, tag="9")]
+ pub stage_attempt_num: u32,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct TaskInfo {
+ #[prost(uint32, tag="1")]
+ pub task_id: u32,
+ #[prost(uint32, tag="2")]
+ pub partition_id: u32,
+ /// Scheduler schedule time
+ #[prost(uint64, tag="3")]
+ pub scheduled_time: u64,
+ /// Scheduler launch time
+ #[prost(uint64, tag="4")]
+ pub launch_time: u64,
+ /// The time the Executor start to run the task
+ #[prost(uint64, tag="5")]
+ pub start_exec_time: u64,
+ /// The time the Executor finish the task
+ #[prost(uint64, tag="6")]
+ pub end_exec_time: u64,
+ /// Scheduler side finish time
+ #[prost(uint64, tag="7")]
+ pub finish_time: u64,
+ #[prost(oneof="task_info::Status", tags="8, 9, 10")]
+ pub status: ::core::option::Option<task_info::Status>,
+}
+/// Nested message and enum types in `TaskInfo`.
+pub mod task_info {
+ #[derive(Clone, PartialEq, ::prost::Oneof)]
+ pub enum Status {
+ #[prost(message, tag="8")]
+ Running(super::RunningTask),
+ #[prost(message, tag="9")]
+ Failed(super::FailedTask),
+ #[prost(message, tag="10")]
+ Successful(super::SuccessfulTask),
+ }
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct GraphStageInput {
+ #[prost(uint32, tag="1")]
+ pub stage_id: u32,
+ #[prost(message, repeated, tag="2")]
+ pub partition_locations: ::prost::alloc::vec::Vec<TaskInputPartitions>,
+ #[prost(bool, tag="3")]
+ pub complete: bool,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct TaskInputPartitions {
+ #[prost(uint32, tag="1")]
+ pub partition: u32,
+ #[prost(message, repeated, tag="2")]
+ pub partition_location: ::prost::alloc::vec::Vec<PartitionLocation>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct KeyValuePair {
+ #[prost(string, tag="1")]
+ pub key: ::prost::alloc::string::String,
+ #[prost(string, tag="2")]
+ pub value: ::prost::alloc::string::String,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct Action {
+ /// configuration settings
+ #[prost(message, repeated, tag="100")]
+ pub settings: ::prost::alloc::vec::Vec<KeyValuePair>,
+ #[prost(oneof="action::ActionType", tags="3")]
+ pub action_type: ::core::option::Option<action::ActionType>,
+}
+/// Nested message and enum types in `Action`.
+pub mod action {
+ #[derive(Clone, PartialEq, ::prost::Oneof)]
+ pub enum ActionType {
+ /// Fetch a partition from an executor
+ #[prost(message, tag="3")]
+ FetchPartition(super::FetchPartition),
+ }
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct ExecutePartition {
+ #[prost(string, tag="1")]
+ pub job_id: ::prost::alloc::string::String,
+ #[prost(uint32, tag="2")]
+ pub stage_id: u32,
+ #[prost(uint32, repeated, tag="3")]
+ pub partition_id: ::prost::alloc::vec::Vec<u32>,
+ #[prost(message, optional, tag="4")]
+ pub plan: ::core::option::Option<PhysicalPlanNode>,
+ /// The task could need to read partitions from other executors
+ #[prost(message, repeated, tag="5")]
+ pub partition_location: ::prost::alloc::vec::Vec<PartitionLocation>,
+ /// Output partition for shuffle writer
+ #[prost(message, optional, tag="6")]
+ pub output_partitioning: ::core::option::Option<PhysicalHashRepartition>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct FetchPartition {
+ #[prost(string, tag="1")]
+ pub job_id: ::prost::alloc::string::String,
+ #[prost(uint32, tag="2")]
+ pub stage_id: u32,
+ #[prost(uint32, tag="3")]
+ pub partition_id: u32,
+ #[prost(string, tag="4")]
+ pub path: ::prost::alloc::string::String,
+ #[prost(string, tag="5")]
+ pub host: ::prost::alloc::string::String,
+ #[prost(uint32, tag="6")]
+ pub port: u32,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PartitionLocation {
+ /// partition_id of the map stage who produces the shuffle.
+ #[prost(uint32, tag="1")]
+ pub map_partition_id: u32,
+ /// partition_id of the shuffle, a composition of(job_id + map_stage_id + partition_id).
+ #[prost(message, optional, tag="2")]
+ pub partition_id: ::core::option::Option<PartitionId>,
+ #[prost(message, optional, tag="3")]
+ pub executor_meta: ::core::option::Option<ExecutorMetadata>,
+ #[prost(message, optional, tag="4")]
+ pub partition_stats: ::core::option::Option<PartitionStats>,
+ #[prost(string, tag="5")]
+ pub path: ::prost::alloc::string::String,
+}
+/// Unique identifier for a materialized partition of data
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PartitionId {
+ #[prost(string, tag="1")]
+ pub job_id: ::prost::alloc::string::String,
+ #[prost(uint32, tag="2")]
+ pub stage_id: u32,
+ #[prost(uint32, tag="4")]
+ pub partition_id: u32,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct TaskId {
+ #[prost(uint32, tag="1")]
+ pub task_id: u32,
+ #[prost(uint32, tag="2")]
+ pub task_attempt_num: u32,
+ #[prost(uint32, tag="3")]
+ pub partition_id: u32,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PartitionStats {
+ #[prost(int64, tag="1")]
+ pub num_rows: i64,
+ #[prost(int64, tag="2")]
+ pub num_batches: i64,
+ #[prost(int64, tag="3")]
+ pub num_bytes: i64,
+ #[prost(message, repeated, tag="4")]
+ pub column_stats: ::prost::alloc::vec::Vec<ColumnStats>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct ColumnStats {
+ #[prost(message, optional, tag="1")]
+ pub min_value: ::core::option::Option<::datafusion_proto::protobuf::ScalarValue>,
+ #[prost(message, optional, tag="2")]
+ pub max_value: ::core::option::Option<::datafusion_proto::protobuf::ScalarValue>,
+ #[prost(uint32, tag="3")]
+ pub null_count: u32,
+ #[prost(uint32, tag="4")]
+ pub distinct_count: u32,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct OperatorMetricsSet {
+ #[prost(message, repeated, tag="1")]
+ pub metrics: ::prost::alloc::vec::Vec<OperatorMetric>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct NamedCount {
+ #[prost(string, tag="1")]
+ pub name: ::prost::alloc::string::String,
+ #[prost(uint64, tag="2")]
+ pub value: u64,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct NamedGauge {
+ #[prost(string, tag="1")]
+ pub name: ::prost::alloc::string::String,
+ #[prost(uint64, tag="2")]
+ pub value: u64,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct NamedTime {
+ #[prost(string, tag="1")]
+ pub name: ::prost::alloc::string::String,
+ #[prost(uint64, tag="2")]
+ pub value: u64,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct OperatorMetric {
+ #[prost(oneof="operator_metric::Metric", tags="1, 2, 3, 4, 5, 6, 7, 8, 9, 10")]
+ pub metric: ::core::option::Option<operator_metric::Metric>,
+}
+/// Nested message and enum types in `OperatorMetric`.
+pub mod operator_metric {
+ #[derive(Clone, PartialEq, ::prost::Oneof)]
+ pub enum Metric {
+ #[prost(uint64, tag="1")]
+ OutputRows(u64),
+ #[prost(uint64, tag="2")]
+ ElapseTime(u64),
+ #[prost(uint64, tag="3")]
+ SpillCount(u64),
+ #[prost(uint64, tag="4")]
+ SpilledBytes(u64),
+ #[prost(uint64, tag="5")]
+ CurrentMemoryUsage(u64),
+ #[prost(message, tag="6")]
+ Count(super::NamedCount),
+ #[prost(message, tag="7")]
+ Gauge(super::NamedGauge),
+ #[prost(message, tag="8")]
+ Time(super::NamedTime),
+ #[prost(int64, tag="9")]
+ StartTimestamp(i64),
+ #[prost(int64, tag="10")]
+ EndTimestamp(i64),
+ }
+}
+/// Used by scheduler
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct ExecutorMetadata {
+ #[prost(string, tag="1")]
+ pub id: ::prost::alloc::string::String,
+ #[prost(string, tag="2")]
+ pub host: ::prost::alloc::string::String,
+ #[prost(uint32, tag="3")]
+ pub port: u32,
+ #[prost(uint32, tag="4")]
+ pub grpc_port: u32,
+ #[prost(message, optional, tag="5")]
+ pub specification: ::core::option::Option<ExecutorSpecification>,
+}
+/// Used by grpc
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct ExecutorRegistration {
+ #[prost(string, tag="1")]
+ pub id: ::prost::alloc::string::String,
+ #[prost(uint32, tag="3")]
+ pub port: u32,
+ #[prost(uint32, tag="4")]
+ pub grpc_port: u32,
+ #[prost(message, optional, tag="5")]
+ pub specification: ::core::option::Option<ExecutorSpecification>,
+ /// "optional" keyword is stable in protoc 3.15 but prost is still on 3.14 (see <https://github.com/tokio-rs/prost/issues/430> and <https://github.com/tokio-rs/prost/pull/455>)
+ /// this syntax is ugly but is binary compatible with the "optional" keyword (see <https://stackoverflow.com/questions/42622015/how-to-define-an-optional-field-in-protobuf-3>)
+ #[prost(oneof="executor_registration::OptionalHost", tags="2")]
+ pub optional_host: ::core::option::Option<executor_registration::OptionalHost>,
+}
+/// Nested message and enum types in `ExecutorRegistration`.
+pub mod executor_registration {
+ /// "optional" keyword is stable in protoc 3.15 but prost is still on 3.14 (see <https://github.com/tokio-rs/prost/issues/430> and <https://github.com/tokio-rs/prost/pull/455>)
+ /// this syntax is ugly but is binary compatible with the "optional" keyword (see <https://stackoverflow.com/questions/42622015/how-to-define-an-optional-field-in-protobuf-3>)
+ #[derive(Clone, PartialEq, ::prost::Oneof)]
+ pub enum OptionalHost {
+ #[prost(string, tag="2")]
+ Host(::prost::alloc::string::String),
+ }
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct ExecutorHeartbeat {
+ #[prost(string, tag="1")]
+ pub executor_id: ::prost::alloc::string::String,
+ /// Unix epoch-based timestamp in seconds
+ #[prost(uint64, tag="2")]
+ pub timestamp: u64,
+ #[prost(message, repeated, tag="3")]
+ pub metrics: ::prost::alloc::vec::Vec<ExecutorMetric>,
+ #[prost(message, optional, tag="4")]
+ pub status: ::core::option::Option<ExecutorStatus>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct ExecutorMetric {
+ /// TODO add more metrics
+ #[prost(oneof="executor_metric::Metric", tags="1")]
+ pub metric: ::core::option::Option<executor_metric::Metric>,
+}
+/// Nested message and enum types in `ExecutorMetric`.
+pub mod executor_metric {
+ /// TODO add more metrics
+ #[derive(Clone, PartialEq, ::prost::Oneof)]
+ pub enum Metric {
+ #[prost(uint64, tag="1")]
+ AvailableMemory(u64),
+ }
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct ExecutorStatus {
+ #[prost(oneof="executor_status::Status", tags="1, 2, 3")]
+ pub status: ::core::option::Option<executor_status::Status>,
+}
+/// Nested message and enum types in `ExecutorStatus`.
+pub mod executor_status {
+ #[derive(Clone, PartialEq, ::prost::Oneof)]
+ pub enum Status {
+ #[prost(string, tag="1")]
+ Active(::prost::alloc::string::String),
+ #[prost(string, tag="2")]
+ Dead(::prost::alloc::string::String),
+ #[prost(string, tag="3")]
+ Unknown(::prost::alloc::string::String),
+ }
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct ExecutorSpecification {
+ #[prost(message, repeated, tag="1")]
+ pub resources: ::prost::alloc::vec::Vec<ExecutorResource>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct ExecutorResource {
+ /// TODO add more resources
+ #[prost(oneof="executor_resource::Resource", tags="1")]
+ pub resource: ::core::option::Option<executor_resource::Resource>,
+}
+/// Nested message and enum types in `ExecutorResource`.
+pub mod executor_resource {
+ /// TODO add more resources
+ #[derive(Clone, PartialEq, ::prost::Oneof)]
+ pub enum Resource {
+ #[prost(uint32, tag="1")]
+ TaskSlots(u32),
+ }
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct ExecutorData {
+ #[prost(string, tag="1")]
+ pub executor_id: ::prost::alloc::string::String,
+ #[prost(message, repeated, tag="2")]
+ pub resources: ::prost::alloc::vec::Vec<ExecutorResourcePair>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct ExecutorResourcePair {
+ #[prost(message, optional, tag="1")]
+ pub total: ::core::option::Option<ExecutorResource>,
+ #[prost(message, optional, tag="2")]
+ pub available: ::core::option::Option<ExecutorResource>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct RunningTask {
+ #[prost(string, tag="1")]
+ pub executor_id: ::prost::alloc::string::String,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct FailedTask {
+ #[prost(string, tag="1")]
+ pub error: ::prost::alloc::string::String,
+ #[prost(bool, tag="2")]
+ pub retryable: bool,
+ /// Whether this task failure should be counted to the maximum number of times the task is allowed to retry
+ #[prost(bool, tag="3")]
+ pub count_to_failures: bool,
+ #[prost(oneof="failed_task::FailedReason", tags="4, 5, 6, 7, 8, 9")]
+ pub failed_reason: ::core::option::Option<failed_task::FailedReason>,
+}
+/// Nested message and enum types in `FailedTask`.
+pub mod failed_task {
+ #[derive(Clone, PartialEq, ::prost::Oneof)]
+ pub enum FailedReason {
+ #[prost(message, tag="4")]
+ ExecutionError(super::ExecutionError),
+ #[prost(message, tag="5")]
+ FetchPartitionError(super::FetchPartitionError),
+ #[prost(message, tag="6")]
+ IoError(super::IoError),
+ #[prost(message, tag="7")]
+ ExecutorLost(super::ExecutorLost),
+ /// A successful task's result is lost due to executor lost
+ #[prost(message, tag="8")]
+ ResultLost(super::ResultLost),
+ #[prost(message, tag="9")]
+ TaskKilled(super::TaskKilled),
+ }
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct SuccessfulTask {
+ #[prost(string, tag="1")]
+ pub executor_id: ::prost::alloc::string::String,
+ /// TODO tasks are currently always shuffle writes but this will not always be the case
+ /// so we might want to think about some refactoring of the task definitions
+ #[prost(message, repeated, tag="2")]
+ pub partitions: ::prost::alloc::vec::Vec<ShuffleWritePartition>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct ExecutionError {
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct FetchPartitionError {
+ #[prost(string, tag="1")]
+ pub executor_id: ::prost::alloc::string::String,
+ #[prost(uint32, tag="2")]
+ pub map_stage_id: u32,
+ #[prost(uint32, tag="3")]
+ pub map_partition_id: u32,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct IoError {
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct ExecutorLost {
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct ResultLost {
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct TaskKilled {
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct ShuffleWritePartition {
+ #[prost(uint64, tag="1")]
+ pub partition_id: u64,
+ #[prost(string, tag="2")]
+ pub path: ::prost::alloc::string::String,
+ #[prost(uint64, tag="3")]
+ pub num_batches: u64,
+ #[prost(uint64, tag="4")]
+ pub num_rows: u64,
+ #[prost(uint64, tag="5")]
+ pub num_bytes: u64,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct TaskStatus {
+ #[prost(uint32, tag="1")]
+ pub task_id: u32,
+ #[prost(string, tag="2")]
+ pub job_id: ::prost::alloc::string::String,
+ #[prost(uint32, tag="3")]
+ pub stage_id: u32,
+ #[prost(uint32, tag="4")]
+ pub stage_attempt_num: u32,
+ #[prost(uint32, tag="5")]
+ pub partition_id: u32,
+ #[prost(uint64, tag="6")]
+ pub launch_time: u64,
+ #[prost(uint64, tag="7")]
+ pub start_exec_time: u64,
+ #[prost(uint64, tag="8")]
+ pub end_exec_time: u64,
+ #[prost(message, repeated, tag="12")]
+ pub metrics: ::prost::alloc::vec::Vec<OperatorMetricsSet>,
+ #[prost(oneof="task_status::Status", tags="9, 10, 11")]
+ pub status: ::core::option::Option<task_status::Status>,
+}
+/// Nested message and enum types in `TaskStatus`.
+pub mod task_status {
+ #[derive(Clone, PartialEq, ::prost::Oneof)]
+ pub enum Status {
+ #[prost(message, tag="9")]
+ Running(super::RunningTask),
+ #[prost(message, tag="10")]
+ Failed(super::FailedTask),
+ #[prost(message, tag="11")]
+ Successful(super::SuccessfulTask),
+ }
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PollWorkParams {
+ #[prost(message, optional, tag="1")]
+ pub metadata: ::core::option::Option<ExecutorRegistration>,
+ #[prost(uint32, tag="2")]
+ pub num_free_slots: u32,
+ /// All tasks must be reported until they reach the failed or completed state
+ #[prost(message, repeated, tag="3")]
+ pub task_status: ::prost::alloc::vec::Vec<TaskStatus>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct TaskDefinition {
+ #[prost(uint32, tag="1")]
+ pub task_id: u32,
+ #[prost(uint32, tag="2")]
+ pub task_attempt_num: u32,
+ #[prost(string, tag="3")]
+ pub job_id: ::prost::alloc::string::String,
+ #[prost(uint32, tag="4")]
+ pub stage_id: u32,
+ #[prost(uint32, tag="5")]
+ pub stage_attempt_num: u32,
+ #[prost(uint32, tag="6")]
+ pub partition_id: u32,
+ #[prost(bytes="vec", tag="7")]
+ pub plan: ::prost::alloc::vec::Vec<u8>,
+ /// Output partition for shuffle writer
+ #[prost(message, optional, tag="8")]
+ pub output_partitioning: ::core::option::Option<PhysicalHashRepartition>,
+ #[prost(string, tag="9")]
+ pub session_id: ::prost::alloc::string::String,
+ #[prost(uint64, tag="10")]
+ pub launch_time: u64,
+ #[prost(message, repeated, tag="11")]
+ pub props: ::prost::alloc::vec::Vec<KeyValuePair>,
+}
+/// A set of tasks in the same stage
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct MultiTaskDefinition {
+ #[prost(message, repeated, tag="1")]
+ pub task_ids: ::prost::alloc::vec::Vec<TaskId>,
+ #[prost(string, tag="2")]
+ pub job_id: ::prost::alloc::string::String,
+ #[prost(uint32, tag="3")]
+ pub stage_id: u32,
+ #[prost(uint32, tag="4")]
+ pub stage_attempt_num: u32,
+ #[prost(bytes="vec", tag="5")]
+ pub plan: ::prost::alloc::vec::Vec<u8>,
+ /// Output partition for shuffle writer
+ #[prost(message, optional, tag="6")]
+ pub output_partitioning: ::core::option::Option<PhysicalHashRepartition>,
+ #[prost(string, tag="7")]
+ pub session_id: ::prost::alloc::string::String,
+ #[prost(uint64, tag="8")]
+ pub launch_time: u64,
+ #[prost(message, repeated, tag="9")]
+ pub props: ::prost::alloc::vec::Vec<KeyValuePair>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct SessionSettings {
+ #[prost(message, repeated, tag="1")]
+ pub configs: ::prost::alloc::vec::Vec<KeyValuePair>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct JobSessionConfig {
+ #[prost(string, tag="1")]
+ pub session_id: ::prost::alloc::string::String,
+ #[prost(message, repeated, tag="2")]
+ pub configs: ::prost::alloc::vec::Vec<KeyValuePair>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PollWorkResult {
+ #[prost(message, repeated, tag="1")]
+ pub tasks: ::prost::alloc::vec::Vec<TaskDefinition>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct RegisterExecutorParams {
+ #[prost(message, optional, tag="1")]
+ pub metadata: ::core::option::Option<ExecutorRegistration>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct RegisterExecutorResult {
+ #[prost(bool, tag="1")]
+ pub success: bool,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct HeartBeatParams {
+ #[prost(string, tag="1")]
+ pub executor_id: ::prost::alloc::string::String,
+ #[prost(message, repeated, tag="2")]
+ pub metrics: ::prost::alloc::vec::Vec<ExecutorMetric>,
+ #[prost(message, optional, tag="3")]
+ pub status: ::core::option::Option<ExecutorStatus>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct HeartBeatResult {
+ /// TODO it's from Spark for BlockManager
+ #[prost(bool, tag="1")]
+ pub reregister: bool,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct StopExecutorParams {
+ #[prost(string, tag="1")]
+ pub executor_id: ::prost::alloc::string::String,
+ /// stop reason
+ #[prost(string, tag="2")]
+ pub reason: ::prost::alloc::string::String,
+ /// force to stop the executor immediately
+ #[prost(bool, tag="3")]
+ pub force: bool,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct StopExecutorResult {
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct ExecutorStoppedParams {
+ #[prost(string, tag="1")]
+ pub executor_id: ::prost::alloc::string::String,
+ /// stop reason
+ #[prost(string, tag="2")]
+ pub reason: ::prost::alloc::string::String,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct ExecutorStoppedResult {
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct UpdateTaskStatusParams {
+ #[prost(string, tag="1")]
+ pub executor_id: ::prost::alloc::string::String,
+ /// All tasks must be reported until they reach the failed or completed state
+ #[prost(message, repeated, tag="2")]
+ pub task_status: ::prost::alloc::vec::Vec<TaskStatus>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct UpdateTaskStatusResult {
+ #[prost(bool, tag="1")]
+ pub success: bool,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct ExecuteQueryParams {
+ #[prost(message, repeated, tag="4")]
+ pub settings: ::prost::alloc::vec::Vec<KeyValuePair>,
+ #[prost(oneof="execute_query_params::Query", tags="1, 2")]
+ pub query: ::core::option::Option<execute_query_params::Query>,
+ #[prost(oneof="execute_query_params::OptionalSessionId", tags="3")]
+ pub optional_session_id: ::core::option::Option<execute_query_params::OptionalSessionId>,
+}
+/// Nested message and enum types in `ExecuteQueryParams`.
+pub mod execute_query_params {
+ #[derive(Clone, PartialEq, ::prost::Oneof)]
+ pub enum Query {
+ #[prost(bytes, tag="1")]
+ LogicalPlan(::prost::alloc::vec::Vec<u8>),
+ #[prost(string, tag="2")]
+ Sql(::prost::alloc::string::String),
+ }
+ #[derive(Clone, PartialEq, ::prost::Oneof)]
+ pub enum OptionalSessionId {
+ #[prost(string, tag="3")]
+ SessionId(::prost::alloc::string::String),
+ }
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct ExecuteSqlParams {
+ #[prost(string, tag="1")]
+ pub sql: ::prost::alloc::string::String,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct ExecuteQueryResult {
+ #[prost(string, tag="1")]
+ pub job_id: ::prost::alloc::string::String,
+ #[prost(string, tag="2")]
+ pub session_id: ::prost::alloc::string::String,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct GetJobStatusParams {
+ #[prost(string, tag="1")]
+ pub job_id: ::prost::alloc::string::String,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct SuccessfulJob {
+ #[prost(message, repeated, tag="1")]
+ pub partition_location: ::prost::alloc::vec::Vec<PartitionLocation>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct QueuedJob {
+}
+/// TODO: add progress report
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct RunningJob {
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct FailedJob {
+ #[prost(string, tag="1")]
+ pub error: ::prost::alloc::string::String,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct JobStatus {
+ #[prost(oneof="job_status::Status", tags="1, 2, 3, 4")]
+ pub status: ::core::option::Option<job_status::Status>,
+}
+/// Nested message and enum types in `JobStatus`.
+pub mod job_status {
+ #[derive(Clone, PartialEq, ::prost::Oneof)]
+ pub enum Status {
+ #[prost(message, tag="1")]
+ Queued(super::QueuedJob),
+ #[prost(message, tag="2")]
+ Running(super::RunningJob),
+ #[prost(message, tag="3")]
+ Failed(super::FailedJob),
+ #[prost(message, tag="4")]
+ Successful(super::SuccessfulJob),
+ }
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct GetJobStatusResult {
+ #[prost(message, optional, tag="1")]
+ pub status: ::core::option::Option<JobStatus>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct GetFileMetadataParams {
+ #[prost(string, tag="1")]
+ pub path: ::prost::alloc::string::String,
+ #[prost(string, tag="2")]
+ pub file_type: ::prost::alloc::string::String,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct GetFileMetadataResult {
+ #[prost(message, optional, tag="1")]
+ pub schema: ::core::option::Option<::datafusion_proto::protobuf::Schema>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct FilePartitionMetadata {
+ #[prost(string, repeated, tag="1")]
+ pub filename: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct CancelJobParams {
+ #[prost(string, tag="1")]
+ pub job_id: ::prost::alloc::string::String,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct CancelJobResult {
+ #[prost(bool, tag="1")]
+ pub cancelled: bool,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct CleanJobDataParams {
+ #[prost(string, tag="1")]
+ pub job_id: ::prost::alloc::string::String,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct CleanJobDataResult {
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct LaunchTaskParams {
+ /// Allow to launch a task set to an executor at once
+ #[prost(message, repeated, tag="1")]
+ pub tasks: ::prost::alloc::vec::Vec<TaskDefinition>,
+ #[prost(string, tag="2")]
+ pub scheduler_id: ::prost::alloc::string::String,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct LaunchMultiTaskParams {
+ /// Allow to launch a task set to an executor at once
+ #[prost(message, repeated, tag="1")]
+ pub multi_tasks: ::prost::alloc::vec::Vec<MultiTaskDefinition>,
+ #[prost(string, tag="2")]
+ pub scheduler_id: ::prost::alloc::string::String,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct LaunchTaskResult {
+ /// TODO when part of the task set are scheduled successfully
+ #[prost(bool, tag="1")]
+ pub success: bool,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct LaunchMultiTaskResult {
+ /// TODO when part of the task set are scheduled successfully
+ #[prost(bool, tag="1")]
+ pub success: bool,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct CancelTasksParams {
+ #[prost(message, repeated, tag="1")]
+ pub task_infos: ::prost::alloc::vec::Vec<RunningTaskInfo>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct CancelTasksResult {
+ #[prost(bool, tag="1")]
+ pub cancelled: bool,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct RemoveJobDataParams {
+ #[prost(string, tag="1")]
+ pub job_id: ::prost::alloc::string::String,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct RemoveJobDataResult {
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct RunningTaskInfo {
+ #[prost(uint32, tag="1")]
+ pub task_id: u32,
+ #[prost(string, tag="2")]
+ pub job_id: ::prost::alloc::string::String,
+ #[prost(uint32, tag="3")]
+ pub stage_id: u32,
+ #[prost(uint32, tag="4")]
+ pub partition_id: u32,
+}
+#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
+#[repr(i32)]
+pub enum PartitionMode {
+ CollectLeft = 0,
+ Partitioned = 1,
+}
+impl PartitionMode {
+ /// String value of the enum field names used in the ProtoBuf definition.
+ ///
+ /// The values are not transformed in any way and thus are considered stable
+ /// (if the ProtoBuf definition does not change) and safe for programmatic use.
+ pub fn as_str_name(&self) -> &'static str {
+ match self {
+ PartitionMode::CollectLeft => "COLLECT_LEFT",
+ PartitionMode::Partitioned => "PARTITIONED",
+ }
+ }
+}
+#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
+#[repr(i32)]
+pub enum AggregateMode {
+ Partial = 0,
+ Final = 1,
+ FinalPartitioned = 2,
+}
+impl AggregateMode {
+ /// String value of the enum field names used in the ProtoBuf definition.
+ ///
+ /// The values are not transformed in any way and thus are considered stable
+ /// (if the ProtoBuf definition does not change) and safe for programmatic use.
+ pub fn as_str_name(&self) -> &'static str {
+ match self {
+ AggregateMode::Partial => "PARTIAL",
+ AggregateMode::Final => "FINAL",
+ AggregateMode::FinalPartitioned => "FINAL_PARTITIONED",
+ }
+ }
+}
+#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
+#[repr(i32)]
+pub enum JoinSide {
+ LeftSide = 0,
+ RightSide = 1,
+}
+impl JoinSide {
+ /// String value of the enum field names used in the ProtoBuf definition.
+ ///
+ /// The values are not transformed in any way and thus are considered stable
+ /// (if the ProtoBuf definition does not change) and safe for programmatic use.
+ pub fn as_str_name(&self) -> &'static str {
+ match self {
+ JoinSide::LeftSide => "LEFT_SIDE",
+ JoinSide::RightSide => "RIGHT_SIDE",
+ }
+ }
+}
+/// Generated client implementations.
+pub mod scheduler_grpc_client {
+ #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
+ use tonic::codegen::*;
+ use tonic::codegen::http::Uri;
+ #[derive(Debug, Clone)]
+ pub struct SchedulerGrpcClient<T> {
+ inner: tonic::client::Grpc<T>,
+ }
+ impl SchedulerGrpcClient<tonic::transport::Channel> {
+ /// Attempt to create a new client by connecting to a given endpoint.
+ pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
+ where
+ D: std::convert::TryInto<tonic::transport::Endpoint>,
+ D::Error: Into<StdError>,
+ {
+ let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
+ Ok(Self::new(conn))
+ }
+ }
+ impl<T> SchedulerGrpcClient<T>
+ where
+ T: tonic::client::GrpcService<tonic::body::BoxBody>,
+ T::Error: Into<StdError>,
+ T::ResponseBody: Body<Data = Bytes> + Send + 'static,
+ <T::ResponseBody as Body>::Error: Into<StdError> + Send,
+ {
+ pub fn new(inner: T) -> Self {
+ let inner = tonic::client::Grpc::new(inner);
+ Self { inner }
+ }
+ pub fn with_origin(inner: T, origin: Uri) -> Self {
+ let inner = tonic::client::Grpc::with_origin(inner, origin);
+ Self { inner }
+ }
+ pub fn with_interceptor<F>(
+ inner: T,
+ interceptor: F,
+ ) -> SchedulerGrpcClient<InterceptedService<T, F>>
+ where
+ F: tonic::service::Interceptor,
+ T::ResponseBody: Default,
+ T: tonic::codegen::Service<
+ http::Request<tonic::body::BoxBody>,
+ Response = http::Response<
+ <T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
+ >,
+ >,
+ <T as tonic::codegen::Service<
+ http::Request<tonic::body::BoxBody>,
+ >>::Error: Into<StdError> + Send + Sync,
+ {
+ SchedulerGrpcClient::new(InterceptedService::new(inner, interceptor))
+ }
+ /// Compress requests with the given encoding.
+ ///
+ /// This requires the server to support it otherwise it might respond with an
+ /// error.
+ #[must_use]
+ pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
+ self.inner = self.inner.send_compressed(encoding);
+ self
+ }
+ /// Enable decompressing responses.
+ #[must_use]
+ pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
+ self.inner = self.inner.accept_compressed(encoding);
+ self
+ }
+ /// Executors must poll the scheduler for heartbeat and to receive tasks
+ pub async fn poll_work(
+ &mut self,
+ request: impl tonic::IntoRequest<super::PollWorkParams>,
+ ) -> Result<tonic::Response<super::PollWorkResult>, tonic::Status> {
+ self.inner
+ .ready()
+ .await
+ .map_err(|e| {
+ tonic::Status::new(
+ tonic::Code::Unknown,
+ format!("Service was not ready: {}", e.into()),
+ )
+ })?;
+ let codec = tonic::codec::ProstCodec::default();
+ let path = http::uri::PathAndQuery::from_static(
+ "/ballista.protobuf.SchedulerGrpc/PollWork",
+ );
+ self.inner.unary(request.into_request(), path, codec).await
+ }
+ pub async fn register_executor(
+ &mut self,
+ request: impl tonic::IntoRequest<super::RegisterExecutorParams>,
+ ) -> Result<tonic::Response<super::RegisterExecutorResult>, tonic::Status> {
+ self.inner
+ .ready()
+ .await
+ .map_err(|e| {
+ tonic::Status::new(
+ tonic::Code::Unknown,
+ format!("Service was not ready: {}", e.into()),
+ )
+ })?;
+ let codec = tonic::codec::ProstCodec::default();
+ let path = http::uri::PathAndQuery::from_static(
+ "/ballista.protobuf.SchedulerGrpc/RegisterExecutor",
+ );
+ self.inner.unary(request.into_request(), path, codec).await
+ }
+ /// Push-based task scheduler will only leverage this interface
+ /// rather than the PollWork interface to report executor states
+ pub async fn heart_beat_from_executor(
+ &mut self,
+ request: impl tonic::IntoRequest<super::HeartBeatParams>,
+ ) -> Result<tonic::Response<super::HeartBeatResult>, tonic::Status> {
+ self.inner
+ .ready()
+ .await
+ .map_err(|e| {
+ tonic::Status::new(
+ tonic::Code::Unknown,
+ format!("Service was not ready: {}", e.into()),
+ )
+ })?;
+ let codec = tonic::codec::ProstCodec::default();
+ let path = http::uri::PathAndQuery::from_static(
+ "/ballista.protobuf.SchedulerGrpc/HeartBeatFromExecutor",
+ );
+ self.inner.unary(request.into_request(), path, codec).await
+ }
+ pub async fn update_task_status(
+ &mut self,
+ request: impl tonic::IntoRequest<super::UpdateTaskStatusParams>,
+ ) -> Result<tonic::Response<super::UpdateTaskStatusResult>, tonic::Status> {
+ self.inner
+ .ready()
+ .await
+ .map_err(|e| {
+ tonic::Status::new(
+ tonic::Code::Unknown,
+ format!("Service was not ready: {}", e.into()),
+ )
+ })?;
+ let codec = tonic::codec::ProstCodec::default();
+ let path = http::uri::PathAndQuery::from_static(
+ "/ballista.protobuf.SchedulerGrpc/UpdateTaskStatus",
+ );
+ self.inner.unary(request.into_request(), path, codec).await
+ }
+ pub async fn get_file_metadata(
+ &mut self,
+ request: impl tonic::IntoRequest<super::GetFileMetadataParams>,
+ ) -> Result<tonic::Response<super::GetFileMetadataResult>, tonic::Status> {
+ self.inner
+ .ready()
+ .await
+ .map_err(|e| {
+ tonic::Status::new(
+ tonic::Code::Unknown,
+ format!("Service was not ready: {}", e.into()),
+ )
+ })?;
+ let codec = tonic::codec::ProstCodec::default();
+ let path = http::uri::PathAndQuery::from_static(
+ "/ballista.protobuf.SchedulerGrpc/GetFileMetadata",
+ );
+ self.inner.unary(request.into_request(), path, codec).await
+ }
+ pub async fn execute_query(
+ &mut self,
+ request: impl tonic::IntoRequest<super::ExecuteQueryParams>,
+ ) -> Result<tonic::Response<super::ExecuteQueryResult>, tonic::Status> {
+ self.inner
+ .ready()
+ .await
+ .map_err(|e| {
+ tonic::Status::new(
+ tonic::Code::Unknown,
+ format!("Service was not ready: {}", e.into()),
+ )
+ })?;
+ let codec = tonic::codec::ProstCodec::default();
+ let path = http::uri::PathAndQuery::from_static(
+ "/ballista.protobuf.SchedulerGrpc/ExecuteQuery",
+ );
+ self.inner.unary(request.into_request(), path, codec).await
+ }
+ pub async fn get_job_status(
+ &mut self,
+ request: impl tonic::IntoRequest<super::GetJobStatusParams>,
+ ) -> Result<tonic::Response<super::GetJobStatusResult>, tonic::Status> {
+ self.inner
+ .ready()
+ .await
+ .map_err(|e| {
+ tonic::Status::new(
+ tonic::Code::Unknown,
+ format!("Service was not ready: {}", e.into()),
+ )
+ })?;
+ let codec = tonic::codec::ProstCodec::default();
+ let path = http::uri::PathAndQuery::from_static(
+ "/ballista.protobuf.SchedulerGrpc/GetJobStatus",
+ );
+ self.inner.unary(request.into_request(), path, codec).await
+ }
+ /// Used by Executor to tell Scheduler it is stopped.
+ pub async fn executor_stopped(
+ &mut self,
+ request: impl tonic::IntoRequest<super::ExecutorStoppedParams>,
+ ) -> Result<tonic::Response<super::ExecutorStoppedResult>, tonic::Status> {
+ self.inner
+ .ready()
+ .await
+ .map_err(|e| {
+ tonic::Status::new(
+ tonic::Code::Unknown,
+ format!("Service was not ready: {}", e.into()),
+ )
+ })?;
+ let codec = tonic::codec::ProstCodec::default();
+ let path = http::uri::PathAndQuery::from_static(
+ "/ballista.protobuf.SchedulerGrpc/ExecutorStopped",
+ );
+ self.inner.unary(request.into_request(), path, codec).await
+ }
+ pub async fn cancel_job(
+ &mut self,
+ request: impl tonic::IntoRequest<super::CancelJobParams>,
+ ) -> Result<tonic::Response<super::CancelJobResult>, tonic::Status> {
+ self.inner
+ .ready()
+ .await
+ .map_err(|e| {
+ tonic::Status::new(
+ tonic::Code::Unknown,
+ format!("Service was not ready: {}", e.into()),
+ )
+ })?;
+ let codec = tonic::codec::ProstCodec::default();
+ let path = http::uri::PathAndQuery::from_static(
+ "/ballista.protobuf.SchedulerGrpc/CancelJob",
+ );
+ self.inner.unary(request.into_request(), path, codec).await
+ }
+ pub async fn clean_job_data(
+ &mut self,
+ request: impl tonic::IntoRequest<super::CleanJobDataParams>,
+ ) -> Result<tonic::Response<super::CleanJobDataResult>, tonic::Status> {
+ self.inner
+ .ready()
+ .await
+ .map_err(|e| {
+ tonic::Status::new(
+ tonic::Code::Unknown,
+ format!("Service was not ready: {}", e.into()),
+ )
+ })?;
+ let codec = tonic::codec::ProstCodec::default();
+ let path = http::uri::PathAndQuery::from_static(
+ "/ballista.protobuf.SchedulerGrpc/CleanJobData",
+ );
+ self.inner.unary(request.into_request(), path, codec).await
+ }
+ }
+}
+/// Generated client implementations.
+pub mod executor_grpc_client {
+ #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
+ use tonic::codegen::*;
+ use tonic::codegen::http::Uri;
+ #[derive(Debug, Clone)]
+ pub struct ExecutorGrpcClient<T> {
+ inner: tonic::client::Grpc<T>,
+ }
+ impl ExecutorGrpcClient<tonic::transport::Channel> {
+ /// Attempt to create a new client by connecting to a given endpoint.
+ pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
+ where
+ D: std::convert::TryInto<tonic::transport::Endpoint>,
+ D::Error: Into<StdError>,
+ {
+ let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
+ Ok(Self::new(conn))
+ }
+ }
+ impl<T> ExecutorGrpcClient<T>
+ where
+ T: tonic::client::GrpcService<tonic::body::BoxBody>,
+ T::Error: Into<StdError>,
+ T::ResponseBody: Body<Data = Bytes> + Send + 'static,
+ <T::ResponseBody as Body>::Error: Into<StdError> + Send,
+ {
+ pub fn new(inner: T) -> Self {
+ let inner = tonic::client::Grpc::new(inner);
+ Self { inner }
+ }
+ pub fn with_origin(inner: T, origin: Uri) -> Self {
+ let inner = tonic::client::Grpc::with_origin(inner, origin);
+ Self { inner }
+ }
+ pub fn with_interceptor<F>(
+ inner: T,
+ interceptor: F,
+ ) -> ExecutorGrpcClient<InterceptedService<T, F>>
+ where
+ F: tonic::service::Interceptor,
+ T::ResponseBody: Default,
+ T: tonic::codegen::Service<
+ http::Request<tonic::body::BoxBody>,
+ Response = http::Response<
+ <T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
+ >,
+ >,
+ <T as tonic::codegen::Service<
+ http::Request<tonic::body::BoxBody>,
+ >>::Error: Into<StdError> + Send + Sync,
+ {
+ ExecutorGrpcClient::new(InterceptedService::new(inner, interceptor))
+ }
+ /// Compress requests with the given encoding.
+ ///
+ /// This requires the server to support it otherwise it might respond with an
+ /// error.
+ #[must_use]
+ pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
+ self.inner = self.inner.send_compressed(encoding);
+ self
+ }
+ /// Enable decompressing responses.
+ #[must_use]
+ pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
+ self.inner = self.inner.accept_compressed(encoding);
+ self
+ }
+ pub async fn launch_task(
+ &mut self,
+ request: impl tonic::IntoRequest<super::LaunchTaskParams>,
+ ) -> Result<tonic::Response<super::LaunchTaskResult>, tonic::Status> {
+ self.inner
+ .ready()
+ .await
+ .map_err(|e| {
+ tonic::Status::new(
+ tonic::Code::Unknown,
+ format!("Service was not ready: {}", e.into()),
+ )
+ })?;
+ let codec = tonic::codec::ProstCodec::default();
+ let path = http::uri::PathAndQuery::from_static(
+ "/ballista.protobuf.ExecutorGrpc/LaunchTask",
+ );
+ self.inner.unary(request.into_request(), path, codec).await
+ }
+ pub async fn launch_multi_task(
+ &mut self,
+ request: impl tonic::IntoRequest<super::LaunchMultiTaskParams>,
+ ) -> Result<tonic::Response<super::LaunchMultiTaskResult>, tonic::Status> {
+ self.inner
+ .ready()
+ .await
+ .map_err(|e| {
+ tonic::Status::new(
+ tonic::Code::Unknown,
+ format!("Service was not ready: {}", e.into()),
+ )
+ })?;
+ let codec = tonic::codec::ProstCodec::default();
+ let path = http::uri::PathAndQuery::from_static(
+ "/ballista.protobuf.ExecutorGrpc/LaunchMultiTask",
+ );
+ self.inner.unary(request.into_request(), path, codec).await
+ }
+ pub async fn stop_executor(
+ &mut self,
+ request: impl tonic::IntoRequest<super::StopExecutorParams>,
+ ) -> Result<tonic::Response<super::StopExecutorResult>, tonic::Status> {
+ self.inner
+ .ready()
+ .await
+ .map_err(|e| {
+ tonic::Status::new(
+ tonic::Code::Unknown,
+ format!("Service was not ready: {}", e.into()),
+ )
+ })?;
+ let codec = tonic::codec::ProstCodec::default();
+ let path = http::uri::PathAndQuery::from_static(
+ "/ballista.protobuf.ExecutorGrpc/StopExecutor",
+ );
+ self.inner.unary(request.into_request(), path, codec).await
+ }
+ pub async fn cancel_tasks(
+ &mut self,
+ request: impl tonic::IntoRequest<super::CancelTasksParams>,
+ ) -> Result<tonic::Response<super::CancelTasksResult>, tonic::Status> {
+ self.inner
+ .ready()
+ .await
+ .map_err(|e| {
+ tonic::Status::new(
+ tonic::Code::Unknown,
+ format!("Service was not ready: {}", e.into()),
+ )
+ })?;
+ let codec = tonic::codec::ProstCodec::default();
+ let path = http::uri::PathAndQuery::from_static(
+ "/ballista.protobuf.ExecutorGrpc/CancelTasks",
+ );
+ self.inner.unary(request.into_request(), path, codec).await
+ }
+ pub async fn remove_job_data(
+ &mut self,
+ request: impl tonic::IntoRequest<super::RemoveJobDataParams>,
+ ) -> Result<tonic::Response<super::RemoveJobDataResult>, tonic::Status> {
+ self.inner
+ .ready()
+ .await
+ .map_err(|e| {
+ tonic::Status::new(
+ tonic::Code::Unknown,
+ format!("Service was not ready: {}", e.into()),
+ )
+ })?;
+ let codec = tonic::codec::ProstCodec::default();
+ let path = http::uri::PathAndQuery::from_static(
+ "/ballista.protobuf.ExecutorGrpc/RemoveJobData",
+ );
+ self.inner.unary(request.into_request(), path, codec).await
+ }
+ }
+}
+/// Generated server implementations.
+pub mod scheduler_grpc_server {
+ #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
+ use tonic::codegen::*;
+ ///Generated trait containing gRPC methods that should be implemented for use with SchedulerGrpcServer.
+ #[async_trait]
+ pub trait SchedulerGrpc: Send + Sync + 'static {
+ /// Executors must poll the scheduler for heartbeat and to receive tasks
+ async fn poll_work(
+ &self,
+ request: tonic::Request<super::PollWorkParams>,
+ ) -> Result<tonic::Response<super::PollWorkResult>, tonic::Status>;
+ async fn register_executor(
+ &self,
+ request: tonic::Request<super::RegisterExecutorParams>,
+ ) -> Result<tonic::Response<super::RegisterExecutorResult>, tonic::Status>;
+ /// Push-based task scheduler will only leverage this interface
+ /// rather than the PollWork interface to report executor states
+ async fn heart_beat_from_executor(
+ &self,
+ request: tonic::Request<super::HeartBeatParams>,
+ ) -> Result<tonic::Response<super::HeartBeatResult>, tonic::Status>;
+ async fn update_task_status(
+ &self,
+ request: tonic::Request<super::UpdateTaskStatusParams>,
+ ) -> Result<tonic::Response<super::UpdateTaskStatusResult>, tonic::Status>;
+ async fn get_file_metadata(
+ &self,
+ request: tonic::Request<super::GetFileMetadataParams>,
+ ) -> Result<tonic::Response<super::GetFileMetadataResult>, tonic::Status>;
+ async fn execute_query(
+ &self,
+ request: tonic::Request<super::ExecuteQueryParams>,
+ ) -> Result<tonic::Response<super::ExecuteQueryResult>, tonic::Status>;
+ async fn get_job_status(
+ &self,
+ request: tonic::Request<super::GetJobStatusParams>,
+ ) -> Result<tonic::Response<super::GetJobStatusResult>, tonic::Status>;
+ /// Used by Executor to tell Scheduler it is stopped.
+ async fn executor_stopped(
+ &self,
+ request: tonic::Request<super::ExecutorStoppedParams>,
+ ) -> Result<tonic::Response<super::ExecutorStoppedResult>, tonic::Status>;
+ async fn cancel_job(
+ &self,
+ request: tonic::Request<super::CancelJobParams>,
+ ) -> Result<tonic::Response<super::CancelJobResult>, tonic::Status>;
+ async fn clean_job_data(
+ &self,
+ request: tonic::Request<super::CleanJobDataParams>,
+ ) -> Result<tonic::Response<super::CleanJobDataResult>, tonic::Status>;
+ }
+ #[derive(Debug)]
+ pub struct SchedulerGrpcServer<T: SchedulerGrpc> {
+ inner: _Inner<T>,
+ accept_compression_encodings: EnabledCompressionEncodings,
+ send_compression_encodings: EnabledCompressionEncodings,
+ }
+ struct _Inner<T>(Arc<T>);
+ impl<T: SchedulerGrpc> SchedulerGrpcServer<T> {
+ pub fn new(inner: T) -> Self {
+ Self::from_arc(Arc::new(inner))
+ }
+ pub fn from_arc(inner: Arc<T>) -> Self {
+ let inner = _Inner(inner);
+ Self {
+ inner,
+ accept_compression_encodings: Default::default(),
+ send_compression_encodings: Default::default(),
+ }
+ }
+ pub fn with_interceptor<F>(
+ inner: T,
+ interceptor: F,
+ ) -> InterceptedService<Self, F>
+ where
+ F: tonic::service::Interceptor,
+ {
+ InterceptedService::new(Self::new(inner), interceptor)
+ }
+ /// Enable decompressing requests with the given encoding.
+ #[must_use]
+ pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
+ self.accept_compression_encodings.enable(encoding);
+ self
+ }
+ /// Compress responses with the given encoding, if the client supports it.
+ #[must_use]
+ pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
+ self.send_compression_encodings.enable(encoding);
+ self
+ }
+ }
+ impl<T, B> tonic::codegen::Service<http::Request<B>> for SchedulerGrpcServer<T>
+ where
+ T: SchedulerGrpc,
+ B: Body + Send + 'static,
+ B::Error: Into<StdError> + Send + 'static,
+ {
+ type Response = http::Response<tonic::body::BoxBody>;
+ type Error = std::convert::Infallible;
+ type Future = BoxFuture<Self::Response, Self::Error>;
+ fn poll_ready(
+ &mut self,
+ _cx: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+ fn call(&mut self, req: http::Request<B>) -> Self::Future {
+ let inner = self.inner.clone();
+ match req.uri().path() {
+ "/ballista.protobuf.SchedulerGrpc/PollWork" => {
+ #[allow(non_camel_case_types)]
+ struct PollWorkSvc<T: SchedulerGrpc>(pub Arc<T>);
+ impl<
+ T: SchedulerGrpc,
+ > tonic::server::UnaryService<super::PollWorkParams>
+ for PollWorkSvc<T> {
+ type Response = super::PollWorkResult;
+ type Future = BoxFuture<
+ tonic::Response<Self::Response>,
+ tonic::Status,
+ >;
+ fn call(
+ &mut self,
+ request: tonic::Request<super::PollWorkParams>,
+ ) -> Self::Future {
+ let inner = self.0.clone();
+ let fut = async move { (*inner).poll_work(request).await };
+ Box::pin(fut)
+ }
+ }
+ let accept_compression_encodings = self.accept_compression_encodings;
+ let send_compression_encodings = self.send_compression_encodings;
+ let inner = self.inner.clone();
+ let fut = async move {
+ let inner = inner.0;
+ let method = PollWorkSvc(inner);
+ let codec = tonic::codec::ProstCodec::default();
+ let mut grpc = tonic::server::Grpc::new(codec)
+ .apply_compression_config(
+ accept_compression_encodings,
+ send_compression_encodings,
+ );
+ let res = grpc.unary(method, req).await;
+ Ok(res)
+ };
+ Box::pin(fut)
+ }
+ "/ballista.protobuf.SchedulerGrpc/RegisterExecutor" => {
+ #[allow(non_camel_case_types)]
+ struct RegisterExecutorSvc<T: SchedulerGrpc>(pub Arc<T>);
+ impl<
+ T: SchedulerGrpc,
+ > tonic::server::UnaryService<super::RegisterExecutorParams>
+ for RegisterExecutorSvc<T> {
+ type Response = super::RegisterExecutorResult;
+ type Future = BoxFuture<
+ tonic::Response<Self::Response>,
+ tonic::Status,
+ >;
+ fn call(
+ &mut self,
+ request: tonic::Request<super::RegisterExecutorParams>,
+ ) -> Self::Future {
+ let inner = self.0.clone();
+ let fut = async move {
+ (*inner).register_executor(request).await
+ };
+ Box::pin(fut)
+ }
+ }
+ let accept_compression_encodings = self.accept_compression_encodings;
+ let send_compression_encodings = self.send_compression_encodings;
+ let inner = self.inner.clone();
+ let fut = async move {
+ let inner = inner.0;
+ let method = RegisterExecutorSvc(inner);
+ let codec = tonic::codec::ProstCodec::default();
+ let mut grpc = tonic::server::Grpc::new(codec)
+ .apply_compression_config(
+ accept_compression_encodings,
+ send_compression_encodings,
+ );
+ let res = grpc.unary(method, req).await;
+ Ok(res)
+ };
+ Box::pin(fut)
+ }
+ "/ballista.protobuf.SchedulerGrpc/HeartBeatFromExecutor" => {
+ #[allow(non_camel_case_types)]
+ struct HeartBeatFromExecutorSvc<T: SchedulerGrpc>(pub Arc<T>);
+ impl<
+ T: SchedulerGrpc,
+ > tonic::server::UnaryService<super::HeartBeatParams>
+ for HeartBeatFromExecutorSvc<T> {
+ type Response = super::HeartBeatResult;
+ type Future = BoxFuture<
+ tonic::Response<Self::Response>,
+ tonic::Status,
+ >;
+ fn call(
+ &mut self,
+ request: tonic::Request<super::HeartBeatParams>,
+ ) -> Self::Future {
+ let inner = self.0.clone();
+ let fut = async move {
+ (*inner).heart_beat_from_executor(request).await
+ };
+ Box::pin(fut)
+ }
+ }
+ let accept_compression_encodings = self.accept_compression_encodings;
+ let send_compression_encodings = self.send_compression_encodings;
+ let inner = self.inner.clone();
+ let fut = async move {
+ let inner = inner.0;
+ let method = HeartBeatFromExecutorSvc(inner);
+ let codec = tonic::codec::ProstCodec::default();
+ let mut grpc = tonic::server::Grpc::new(codec)
+ .apply_compression_config(
+ accept_compression_encodings,
+ send_compression_encodings,
+ );
+ let res = grpc.unary(method, req).await;
+ Ok(res)
+ };
+ Box::pin(fut)
+ }
+ "/ballista.protobuf.SchedulerGrpc/UpdateTaskStatus" => {
+ #[allow(non_camel_case_types)]
+ struct UpdateTaskStatusSvc<T: SchedulerGrpc>(pub Arc<T>);
+ impl<
+ T: SchedulerGrpc,
+ > tonic::server::UnaryService<super::UpdateTaskStatusParams>
+ for UpdateTaskStatusSvc<T> {
+ type Response = super::UpdateTaskStatusResult;
+ type Future = BoxFuture<
+ tonic::Response<Self::Response>,
+ tonic::Status,
+ >;
+ fn call(
+ &mut self,
+ request: tonic::Request<super::UpdateTaskStatusParams>,
+ ) -> Self::Future {
+ let inner = self.0.clone();
+ let fut = async move {
+ (*inner).update_task_status(request).await
+ };
+ Box::pin(fut)
+ }
+ }
+ let accept_compression_encodings = self.accept_compression_encodings;
+ let send_compression_encodings = self.send_compression_encodings;
+ let inner = self.inner.clone();
+ let fut = async move {
+ let inner = inner.0;
+ let method = UpdateTaskStatusSvc(inner);
+ let codec = tonic::codec::ProstCodec::default();
+ let mut grpc = tonic::server::Grpc::new(codec)
+ .apply_compression_config(
+ accept_compression_encodings,
+ send_compression_encodings,
+ );
+ let res = grpc.unary(method, req).await;
+ Ok(res)
+ };
+ Box::pin(fut)
+ }
+ "/ballista.protobuf.SchedulerGrpc/GetFileMetadata" => {
+ #[allow(non_camel_case_types)]
+ struct GetFileMetadataSvc<T: SchedulerGrpc>(pub Arc<T>);
+ impl<
+ T: SchedulerGrpc,
+ > tonic::server::UnaryService<super::GetFileMetadataParams>
+ for GetFileMetadataSvc<T> {
+ type Response = super::GetFileMetadataResult;
+ type Future = BoxFuture<
+ tonic::Response<Self::Response>,
+ tonic::Status,
+ >;
+ fn call(
+ &mut self,
+ request: tonic::Request<super::GetFileMetadataParams>,
+ ) -> Self::Future {
+ let inner = self.0.clone();
+ let fut = async move {
+ (*inner).get_file_metadata(request).await
+ };
+ Box::pin(fut)
+ }
+ }
+ let accept_compression_encodings = self.accept_compression_encodings;
+ let send_compression_encodings = self.send_compression_encodings;
+ let inner = self.inner.clone();
+ let fut = async move {
+ let inner = inner.0;
+ let method = GetFileMetadataSvc(inner);
+ let codec = tonic::codec::ProstCodec::default();
+ let mut grpc = tonic::server::Grpc::new(codec)
+ .apply_compression_config(
+ accept_compression_encodings,
+ send_compression_encodings,
+ );
+ let res = grpc.unary(method, req).await;
+ Ok(res)
+ };
+ Box::pin(fut)
+ }
+ "/ballista.protobuf.SchedulerGrpc/ExecuteQuery" => {
+ #[allow(non_camel_case_types)]
+ struct ExecuteQuerySvc<T: SchedulerGrpc>(pub Arc<T>);
+ impl<
+ T: SchedulerGrpc,
+ > tonic::server::UnaryService<super::ExecuteQueryParams>
+ for ExecuteQuerySvc<T> {
+ type Response = super::ExecuteQueryResult;
+ type Future = BoxFuture<
+ tonic::Response<Self::Response>,
+ tonic::Status,
+ >;
+ fn call(
+ &mut self,
+ request: tonic::Request<super::ExecuteQueryParams>,
+ ) -> Self::Future {
+ let inner = self.0.clone();
+ let fut = async move {
+ (*inner).execute_query(request).await
+ };
+ Box::pin(fut)
+ }
+ }
+ let accept_compression_encodings = self.accept_compression_encodings;
+ let send_compression_encodings = self.send_compression_encodings;
+ let inner = self.inner.clone();
+ let fut = async move {
+ let inner = inner.0;
+ let method = ExecuteQuerySvc(inner);
+ let codec = tonic::codec::ProstCodec::default();
+ let mut grpc = tonic::server::Grpc::new(codec)
+ .apply_compression_config(
+ accept_compression_encodings,
+ send_compression_encodings,
+ );
+ let res = grpc.unary(method, req).await;
+ Ok(res)
+ };
+ Box::pin(fut)
+ }
+ "/ballista.protobuf.SchedulerGrpc/GetJobStatus" => {
+ #[allow(non_camel_case_types)]
+ struct GetJobStatusSvc<T: SchedulerGrpc>(pub Arc<T>);
+ impl<
+ T: SchedulerGrpc,
+ > tonic::server::UnaryService<super::GetJobStatusParams>
+ for GetJobStatusSvc<T> {
+ type Response = super::GetJobStatusResult;
+ type Future = BoxFuture<
+ tonic::Response<Self::Response>,
+ tonic::Status,
+ >;
+ fn call(
+ &mut self,
+ request: tonic::Request<super::GetJobStatusParams>,
+ ) -> Self::Future {
+ let inner = self.0.clone();
+ let fut = async move {
+ (*inner).get_job_status(request).await
+ };
+ Box::pin(fut)
+ }
+ }
+ let accept_compression_encodings = self.accept_compression_encodings;
+ let send_compression_encodings = self.send_compression_encodings;
+ let inner = self.inner.clone();
+ let fut = async move {
+ let inner = inner.0;
+ let method = GetJobStatusSvc(inner);
+ let codec = tonic::codec::ProstCodec::default();
+ let mut grpc = tonic::server::Grpc::new(codec)
+ .apply_compression_config(
+ accept_compression_encodings,
+ send_compression_encodings,
+ );
+ let res = grpc.unary(method, req).await;
+ Ok(res)
+ };
+ Box::pin(fut)
+ }
+ "/ballista.protobuf.SchedulerGrpc/ExecutorStopped" => {
+ #[allow(non_camel_case_types)]
+ struct ExecutorStoppedSvc<T: SchedulerGrpc>(pub Arc<T>);
+ impl<
+ T: SchedulerGrpc,
+ > tonic::server::UnaryService<super::ExecutorStoppedParams>
+ for ExecutorStoppedSvc<T> {
+ type Response = super::ExecutorStoppedResult;
+ type Future = BoxFuture<
+ tonic::Response<Self::Response>,
+ tonic::Status,
+ >;
+ fn call(
+ &mut self,
+ request: tonic::Request<super::ExecutorStoppedParams>,
+ ) -> Self::Future {
+ let inner = self.0.clone();
+ let fut = async move {
+ (*inner).executor_stopped(request).await
+ };
+ Box::pin(fut)
+ }
+ }
+ let accept_compression_encodings = self.accept_compression_encodings;
+ let send_compression_encodings = self.send_compression_encodings;
+ let inner = self.inner.clone();
+ let fut = async move {
+ let inner = inner.0;
+ let method = ExecutorStoppedSvc(inner);
+ let codec = tonic::codec::ProstCodec::default();
+ let mut grpc = tonic::server::Grpc::new(codec)
+ .apply_compression_config(
+ accept_compression_encodings,
+ send_compression_encodings,
+ );
+ let res = grpc.unary(method, req).await;
+ Ok(res)
+ };
+ Box::pin(fut)
+ }
+ "/ballista.protobuf.SchedulerGrpc/CancelJob" => {
+ #[allow(non_camel_case_types)]
+ struct CancelJobSvc<T: SchedulerGrpc>(pub Arc<T>);
+ impl<
+ T: SchedulerGrpc,
+ > tonic::server::UnaryService<super::CancelJobParams>
+ for CancelJobSvc<T> {
+ type Response = super::CancelJobResult;
+ type Future = BoxFuture<
+ tonic::Response<Self::Response>,
+ tonic::Status,
+ >;
+ fn call(
+ &mut self,
+ request: tonic::Request<super::CancelJobParams>,
+ ) -> Self::Future {
+ let inner = self.0.clone();
+ let fut = async move { (*inner).cancel_job(request).await };
+ Box::pin(fut)
+ }
+ }
+ let accept_compression_encodings = self.accept_compression_encodings;
+ let send_compression_encodings = self.send_compression_encodings;
+ let inner = self.inner.clone();
+ let fut = async move {
+ let inner = inner.0;
+ let method = CancelJobSvc(inner);
+ let codec = tonic::codec::ProstCodec::default();
+ let mut grpc = tonic::server::Grpc::new(codec)
+ .apply_compression_config(
+ accept_compression_encodings,
+ send_compression_encodings,
+ );
+ let res = grpc.unary(method, req).await;
+ Ok(res)
+ };
+ Box::pin(fut)
+ }
+ "/ballista.protobuf.SchedulerGrpc/CleanJobData" => {
+ #[allow(non_camel_case_types)]
+ struct CleanJobDataSvc<T: SchedulerGrpc>(pub Arc<T>);
+ impl<
+ T: SchedulerGrpc,
+ > tonic::server::UnaryService<super::CleanJobDataParams>
+ for CleanJobDataSvc<T> {
+ type Response = super::CleanJobDataResult;
+ type Future = BoxFuture<
+ tonic::Response<Self::Response>,
+ tonic::Status,
+ >;
+ fn call(
+ &mut self,
+ request: tonic::Request<super::CleanJobDataParams>,
+ ) -> Self::Future {
+ let inner = self.0.clone();
+ let fut = async move {
+ (*inner).clean_job_data(request).await
+ };
+ Box::pin(fut)
+ }
+ }
+ let accept_compression_encodings = self.accept_compression_encodings;
+ let send_compression_encodings = self.send_compression_encodings;
+ let inner = self.inner.clone();
+ let fut = async move {
+ let inner = inner.0;
+ let method = CleanJobDataSvc(inner);
+ let codec = tonic::codec::ProstCodec::default();
+ let mut grpc = tonic::server::Grpc::new(codec)
+ .apply_compression_config(
+ accept_compression_encodings,
+ send_compression_encodings,
+ );
+ let res = grpc.unary(method, req).await;
+ Ok(res)
+ };
+ Box::pin(fut)
+ }
+ _ => {
+ Box::pin(async move {
+ Ok(
+ http::Response::builder()
+ .status(200)
+ .header("grpc-status", "12")
+ .header("content-type", "application/grpc")
+ .body(empty_body())
+ .unwrap(),
+ )
+ })
+ }
+ }
+ }
+ }
+ impl<T: SchedulerGrpc> Clone for SchedulerGrpcServer<T> {
+ fn clone(&self) -> Self {
+ let inner = self.inner.clone();
+ Self {
+ inner,
+ accept_compression_encodings: self.accept_compression_encodings,
+ send_compression_encodings: self.send_compression_encodings,
+ }
+ }
+ }
+ impl<T: SchedulerGrpc> Clone for _Inner<T> {
+ fn clone(&self) -> Self {
+ Self(self.0.clone())
+ }
+ }
+ impl<T: std::fmt::Debug> std::fmt::Debug for _Inner<T> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "{:?}", self.0)
+ }
+ }
+ impl<T: SchedulerGrpc> tonic::server::NamedService for SchedulerGrpcServer<T> {
+ const NAME: &'static str = "ballista.protobuf.SchedulerGrpc";
+ }
+}
+/// Generated server implementations.
+pub mod executor_grpc_server {
+ #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
+ use tonic::codegen::*;
+ ///Generated trait containing gRPC methods that should be implemented for use with ExecutorGrpcServer.
+ #[async_trait]
+ pub trait ExecutorGrpc: Send + Sync + 'static {
+ async fn launch_task(
+ &self,
+ request: tonic::Request<super::LaunchTaskParams>,
+ ) -> Result<tonic::Response<super::LaunchTaskResult>, tonic::Status>;
+ async fn launch_multi_task(
+ &self,
+ request: tonic::Request<super::LaunchMultiTaskParams>,
+ ) -> Result<tonic::Response<super::LaunchMultiTaskResult>, tonic::Status>;
+ async fn stop_executor(
+ &self,
+ request: tonic::Request<super::StopExecutorParams>,
+ ) -> Result<tonic::Response<super::StopExecutorResult>, tonic::Status>;
+ async fn cancel_tasks(
+ &self,
+ request: tonic::Request<super::CancelTasksParams>,
+ ) -> Result<tonic::Response<super::CancelTasksResult>, tonic::Status>;
+ async fn remove_job_data(
+ &self,
+ request: tonic::Request<super::RemoveJobDataParams>,
+ ) -> Result<tonic::Response<super::RemoveJobDataResult>, tonic::Status>;
+ }
+ #[derive(Debug)]
+ pub struct ExecutorGrpcServer<T: ExecutorGrpc> {
+ inner: _Inner<T>,
+ accept_compression_encodings: EnabledCompressionEncodings,
+ send_compression_encodings: EnabledCompressionEncodings,
+ }
+ struct _Inner<T>(Arc<T>);
+ impl<T: ExecutorGrpc> ExecutorGrpcServer<T> {
+ pub fn new(inner: T) -> Self {
+ Self::from_arc(Arc::new(inner))
+ }
+ pub fn from_arc(inner: Arc<T>) -> Self {
+ let inner = _Inner(inner);
+ Self {
+ inner,
+ accept_compression_encodings: Default::default(),
+ send_compression_encodings: Default::default(),
+ }
+ }
+ pub fn with_interceptor<F>(
+ inner: T,
+ interceptor: F,
+ ) -> InterceptedService<Self, F>
+ where
+ F: tonic::service::Interceptor,
+ {
+ InterceptedService::new(Self::new(inner), interceptor)
+ }
+ /// Enable decompressing requests with the given encoding.
+ #[must_use]
+ pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
+ self.accept_compression_encodings.enable(encoding);
+ self
+ }
+ /// Compress responses with the given encoding, if the client supports it.
+ #[must_use]
+ pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
+ self.send_compression_encodings.enable(encoding);
+ self
+ }
+ }
+ impl<T, B> tonic::codegen::Service<http::Request<B>> for ExecutorGrpcServer<T>
+ where
+ T: ExecutorGrpc,
+ B: Body + Send + 'static,
+ B::Error: Into<StdError> + Send + 'static,
+ {
+ type Response = http::Response<tonic::body::BoxBody>;
+ type Error = std::convert::Infallible;
+ type Future = BoxFuture<Self::Response, Self::Error>;
+ fn poll_ready(
+ &mut self,
+ _cx: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+ fn call(&mut self, req: http::Request<B>) -> Self::Future {
+ let inner = self.inner.clone();
+ match req.uri().path() {
+ "/ballista.protobuf.ExecutorGrpc/LaunchTask" => {
+ #[allow(non_camel_case_types)]
+ struct LaunchTaskSvc<T: ExecutorGrpc>(pub Arc<T>);
+ impl<
+ T: ExecutorGrpc,
+ > tonic::server::UnaryService<super::LaunchTaskParams>
+ for LaunchTaskSvc<T> {
+ type Response = super::LaunchTaskResult;
+ type Future = BoxFuture<
+ tonic::Response<Self::Response>,
+ tonic::Status,
+ >;
+ fn call(
+ &mut self,
+ request: tonic::Request<super::LaunchTaskParams>,
+ ) -> Self::Future {
+ let inner = self.0.clone();
+ let fut = async move { (*inner).launch_task(request).await };
+ Box::pin(fut)
+ }
+ }
+ let accept_compression_encodings = self.accept_compression_encodings;
+ let send_compression_encodings = self.send_compression_encodings;
+ let inner = self.inner.clone();
+ let fut = async move {
+ let inner = inner.0;
+ let method = LaunchTaskSvc(inner);
+ let codec = tonic::codec::ProstCodec::default();
+ let mut grpc = tonic::server::Grpc::new(codec)
+ .apply_compression_config(
+ accept_compression_encodings,
+ send_compression_encodings,
+ );
+ let res = grpc.unary(method, req).await;
+ Ok(res)
+ };
+ Box::pin(fut)
+ }
+ "/ballista.protobuf.ExecutorGrpc/LaunchMultiTask" => {
+ #[allow(non_camel_case_types)]
+ struct LaunchMultiTaskSvc<T: ExecutorGrpc>(pub Arc<T>);
+ impl<
+ T: ExecutorGrpc,
+ > tonic::server::UnaryService<super::LaunchMultiTaskParams>
+ for LaunchMultiTaskSvc<T> {
+ type Response = super::LaunchMultiTaskResult;
+ type Future = BoxFuture<
+ tonic::Response<Self::Response>,
+ tonic::Status,
+ >;
+ fn call(
+ &mut self,
+ request: tonic::Request<super::LaunchMultiTaskParams>,
+ ) -> Self::Future {
+ let inner = self.0.clone();
+ let fut = async move {
+ (*inner).launch_multi_task(request).await
+ };
+ Box::pin(fut)
+ }
+ }
+ let accept_compression_encodings = self.accept_compression_encodings;
+ let send_compression_encodings = self.send_compression_encodings;
+ let inner = self.inner.clone();
+ let fut = async move {
+ let inner = inner.0;
+ let method = LaunchMultiTaskSvc(inner);
+ let codec = tonic::codec::ProstCodec::default();
+ let mut grpc = tonic::server::Grpc::new(codec)
+ .apply_compression_config(
+ accept_compression_encodings,
+ send_compression_encodings,
+ );
+ let res = grpc.unary(method, req).await;
+ Ok(res)
+ };
+ Box::pin(fut)
+ }
+ "/ballista.protobuf.ExecutorGrpc/StopExecutor" => {
+ #[allow(non_camel_case_types)]
+ struct StopExecutorSvc<T: ExecutorGrpc>(pub Arc<T>);
+ impl<
+ T: ExecutorGrpc,
+ > tonic::server::UnaryService<super::StopExecutorParams>
+ for StopExecutorSvc<T> {
+ type Response = super::StopExecutorResult;
+ type Future = BoxFuture<
+ tonic::Response<Self::Response>,
+ tonic::Status,
+ >;
+ fn call(
+ &mut self,
+ request: tonic::Request<super::StopExecutorParams>,
+ ) -> Self::Future {
+ let inner = self.0.clone();
+ let fut = async move {
+ (*inner).stop_executor(request).await
+ };
+ Box::pin(fut)
+ }
+ }
+ let accept_compression_encodings = self.accept_compression_encodings;
+ let send_compression_encodings = self.send_compression_encodings;
+ let inner = self.inner.clone();
+ let fut = async move {
+ let inner = inner.0;
+ let method = StopExecutorSvc(inner);
+ let codec = tonic::codec::ProstCodec::default();
+ let mut grpc = tonic::server::Grpc::new(codec)
+ .apply_compression_config(
+ accept_compression_encodings,
+ send_compression_encodings,
+ );
+ let res = grpc.unary(method, req).await;
+ Ok(res)
+ };
+ Box::pin(fut)
+ }
+ "/ballista.protobuf.ExecutorGrpc/CancelTasks" => {
+ #[allow(non_camel_case_types)]
+ struct CancelTasksSvc<T: ExecutorGrpc>(pub Arc<T>);
+ impl<
+ T: ExecutorGrpc,
+ > tonic::server::UnaryService<super::CancelTasksParams>
+ for CancelTasksSvc<T> {
+ type Response = super::CancelTasksResult;
+ type Future = BoxFuture<
+ tonic::Response<Self::Response>,
+ tonic::Status,
+ >;
+ fn call(
+ &mut self,
+ request: tonic::Request<super::CancelTasksParams>,
+ ) -> Self::Future {
+ let inner = self.0.clone();
+ let fut = async move {
+ (*inner).cancel_tasks(request).await
+ };
+ Box::pin(fut)
+ }
+ }
+ let accept_compression_encodings = self.accept_compression_encodings;
+ let send_compression_encodings = self.send_compression_encodings;
+ let inner = self.inner.clone();
+ let fut = async move {
+ let inner = inner.0;
+ let method = CancelTasksSvc(inner);
+ let codec = tonic::codec::ProstCodec::default();
+ let mut grpc = tonic::server::Grpc::new(codec)
+ .apply_compression_config(
+ accept_compression_encodings,
+ send_compression_encodings,
+ );
+ let res = grpc.unary(method, req).await;
+ Ok(res)
+ };
+ Box::pin(fut)
+ }
+ "/ballista.protobuf.ExecutorGrpc/RemoveJobData" => {
+ #[allow(non_camel_case_types)]
+ struct RemoveJobDataSvc<T: ExecutorGrpc>(pub Arc<T>);
+ impl<
+ T: ExecutorGrpc,
+ > tonic::server::UnaryService<super::RemoveJobDataParams>
+ for RemoveJobDataSvc<T> {
+ type Response = super::RemoveJobDataResult;
+ type Future = BoxFuture<
+ tonic::Response<Self::Response>,
+ tonic::Status,
+ >;
+ fn call(
+ &mut self,
+ request: tonic::Request<super::RemoveJobDataParams>,
+ ) -> Self::Future {
+ let inner = self.0.clone();
+ let fut = async move {
+ (*inner).remove_job_data(request).await
+ };
+ Box::pin(fut)
+ }
+ }
+ let accept_compression_encodings = self.accept_compression_encodings;
+ let send_compression_encodings = self.send_compression_encodings;
+ let inner = self.inner.clone();
+ let fut = async move {
+ let inner = inner.0;
+ let method = RemoveJobDataSvc(inner);
+ let codec = tonic::codec::ProstCodec::default();
+ let mut grpc = tonic::server::Grpc::new(codec)
+ .apply_compression_config(
+ accept_compression_encodings,
+ send_compression_encodings,
+ );
+ let res = grpc.unary(method, req).await;
+ Ok(res)
+ };
+ Box::pin(fut)
+ }
+ _ => {
+ Box::pin(async move {
+ Ok(
+ http::Response::builder()
+ .status(200)
+ .header("grpc-status", "12")
+ .header("content-type", "application/grpc")
+ .body(empty_body())
+ .unwrap(),
+ )
+ })
+ }
+ }
+ }
+ }
+ impl<T: ExecutorGrpc> Clone for ExecutorGrpcServer<T> {
+ fn clone(&self) -> Self {
+ let inner = self.inner.clone();
+ Self {
+ inner,
+ accept_compression_encodings: self.accept_compression_encodings,
+ send_compression_encodings: self.send_compression_encodings,
+ }
+ }
+ }
+ impl<T: ExecutorGrpc> Clone for _Inner<T> {
+ fn clone(&self) -> Self {
+ Self(self.0.clone())
+ }
+ }
+ impl<T: std::fmt::Debug> std::fmt::Debug for _Inner<T> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "{:?}", self.0)
+ }
+ }
+ impl<T: ExecutorGrpc> tonic::server::NamedService for ExecutorGrpcServer<T> {
+ const NAME: &'static str = "ballista.protobuf.ExecutorGrpc";
+ }
+}
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 5a899605..5752359a 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -25,7 +25,7 @@ homepage = "https://github.com/apache/arrow-ballista"
repository = "https://github.com/apache/arrow-ballista"
license = "Apache-2.0"
publish = false
-rust-version = "1.59"
+rust-version = "1.63"
[features]
default = ["mimalloc"]
diff --git a/dev/release/rat_exclude_files.txt b/dev/release/rat_exclude_files.txt
index ea641170..7bd9d681 100644
--- a/dev/release/rat_exclude_files.txt
+++ b/dev/release/rat_exclude_files.txt
@@ -53,3 +53,4 @@ Cargo.lock
.history
parquet-testing/*
*rat.txt
+ballista/core/src/serde/generated/ballista.rs
\ No newline at end of file
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index bd1acd99..7d24401c 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -26,7 +26,7 @@ license = "Apache-2.0"
keywords = [ "arrow", "distributed", "query", "sql" ]
edition = "2021"
publish = false
-rust-version = "1.59"
+rust-version = "1.63"
[[example]]
name = "standalone_sql"