You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/06/04 15:18:16 UTC
[arrow-ballista] branch master updated: Delegate to `datafusion-proto` for logical plan serde (#57)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new 9b1394e9 Delegate to `datafusion-proto` for logical plan serde (#57)
9b1394e9 is described below
commit 9b1394e9748b99416f9c760e0e0cfdfbeddb5b99
Author: Andy Grove <an...@nvidia.com>
AuthorDate: Sat Jun 4 09:18:12 2022 -0600
Delegate to `datafusion-proto` for logical plan serde (#57)
---
ballista/rust/client/Cargo.toml | 2 +-
ballista/rust/client/src/context.rs | 3 +-
.../core/src/execution_plans/distributed_query.rs | 29 +-
ballista/rust/core/src/serde/logical_plan/mod.rs | 1024 +-------------------
ballista/rust/core/src/serde/mod.rs | 104 +-
ballista/rust/core/src/serde/physical_plan/mod.rs | 3 +-
ballista/rust/core/src/utils.rs | 25 +-
ballista/rust/executor/Cargo.toml | 1 +
ballista/rust/executor/src/execution_loop.rs | 30 +-
ballista/rust/executor/src/executor_server.rs | 3 +-
ballista/rust/executor/src/main.rs | 3 +-
ballista/rust/executor/src/standalone.rs | 12 +-
ballista/rust/scheduler/Cargo.toml | 1 +
ballista/rust/scheduler/src/api/handlers.rs | 3 +-
ballista/rust/scheduler/src/api/mod.rs | 3 +-
ballista/rust/scheduler/src/main.rs | 5 +-
ballista/rust/scheduler/src/planner.rs | 3 +-
.../scheduler/src/scheduler_server/event_loop.rs | 10 +-
.../src/scheduler_server/external_scaler.rs | 3 +-
.../rust/scheduler/src/scheduler_server/grpc.rs | 19 +-
.../rust/scheduler/src/scheduler_server/mod.rs | 30 +-
.../src/scheduler_server/query_stage_scheduler.rs | 23 +-
ballista/rust/scheduler/src/standalone.rs | 3 +-
ballista/rust/scheduler/src/state/mod.rs | 24 +-
.../rust/scheduler/src/state/persistent_state.rs | 29 +-
.../rust/scheduler/src/state/task_scheduler.rs | 3 +-
benchmarks/Cargo.toml | 1 +
benchmarks/src/bin/tpch.rs | 51 +-
28 files changed, 183 insertions(+), 1267 deletions(-)
diff --git a/ballista/rust/client/Cargo.toml b/ballista/rust/client/Cargo.toml
index 65f9a24f..b9138c71 100644
--- a/ballista/rust/client/Cargo.toml
+++ b/ballista/rust/client/Cargo.toml
@@ -31,8 +31,8 @@ rust-version = "1.59"
ballista-core = { path = "../core", version = "0.7.0" }
ballista-executor = { path = "../executor", version = "0.7.0", optional = true }
ballista-scheduler = { path = "../scheduler", version = "0.7.0", optional = true }
-
datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "3c1c188e1476575f113a511789e398fdd5c009cd" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "3c1c188e1476575f113a511789e398fdd5c009cd" }
futures = "0.3"
log = "0.4"
parking_lot = "0.12"
diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs
index 4209af5e..09ff9cee 100644
--- a/ballista/rust/client/src/context.rs
+++ b/ballista/rust/client/src/context.rs
@@ -27,8 +27,9 @@ use std::sync::Arc;
use ballista_core::config::BallistaConfig;
use ballista_core::serde::protobuf::scheduler_grpc_client::SchedulerGrpcClient;
-use ballista_core::serde::protobuf::{ExecuteQueryParams, KeyValuePair, LogicalPlanNode};
+use ballista_core::serde::protobuf::{ExecuteQueryParams, KeyValuePair};
use ballista_core::utils::create_df_ctx_with_ballista_query_planner;
+use datafusion_proto::protobuf::LogicalPlanNode;
use datafusion::catalog::TableReference;
use datafusion::dataframe::DataFrame;
diff --git a/ballista/rust/core/src/execution_plans/distributed_query.rs b/ballista/rust/core/src/execution_plans/distributed_query.rs
index 0b20acb4..62e7ff02 100644
--- a/ballista/rust/core/src/execution_plans/distributed_query.rs
+++ b/ballista/rust/core/src/execution_plans/distributed_query.rs
@@ -15,38 +15,35 @@
// specific language governing permissions and limitations
// under the License.
-use std::any::Any;
-
-use std::fmt::Debug;
-use std::marker::PhantomData;
-
-use std::sync::Arc;
-use std::time::Duration;
-
use crate::client::BallistaClient;
use crate::config::BallistaConfig;
+use crate::serde::protobuf::execute_query_params::OptionalSessionId;
use crate::serde::protobuf::{
execute_query_params::Query, job_status, scheduler_grpc_client::SchedulerGrpcClient,
ExecuteQueryParams, GetJobStatusParams, GetJobStatusResult, KeyValuePair,
PartitionLocation,
};
-
use datafusion::arrow::datatypes::SchemaRef;
+use datafusion::arrow::error::{ArrowError, Result as ArrowResult};
+use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError, Result};
+use datafusion::execution::context::TaskContext;
use datafusion::logical_plan::LogicalPlan;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
+use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
};
-
-use crate::serde::protobuf::execute_query_params::OptionalSessionId;
-use crate::serde::{AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec};
-use datafusion::arrow::error::{ArrowError, Result as ArrowResult};
-use datafusion::arrow::record_batch::RecordBatch;
-use datafusion::execution::context::TaskContext;
-use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
+use datafusion_proto::logical_plan::{
+ AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
+};
use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt};
use log::{error, info};
+use std::any::Any;
+use std::fmt::Debug;
+use std::marker::PhantomData;
+use std::sync::Arc;
+use std::time::Duration;
/// This operator sends a logical plan to a Ballista scheduler for execution and
/// polls the scheduler until the query is complete and then fetches the resulting
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs
index be76e538..c4532711 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -15,1014 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-use crate::error::BallistaError;
-use crate::serde::protobuf::LogicalExtensionNode;
-use crate::serde::{
- byte_to_string, proto_error, protobuf, str_to_byte, AsLogicalPlan,
- LogicalExtensionCodec,
-};
-use crate::{convert_required, into_logical_plan};
-use datafusion::arrow::datatypes::Schema;
-use datafusion::datasource::file_format::avro::AvroFormat;
-use datafusion::datasource::file_format::csv::CsvFormat;
-use datafusion::datasource::file_format::parquet::ParquetFormat;
-use datafusion::datasource::file_format::FileFormat;
-use datafusion::datasource::listing::{
- ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
-};
-use datafusion::logical_plan::plan::{
- Aggregate, EmptyRelation, Filter, Join, Projection, Sort, SubqueryAlias, Window,
-};
-use datafusion::logical_plan::{
- provider_as_source, source_as_provider, Column, CreateCatalog, CreateCatalogSchema,
- CreateExternalTable, CreateView, CrossJoin, Expr, JoinConstraint, Limit, LogicalPlan,
- LogicalPlanBuilder, Offset, Repartition, TableScan, Values,
-};
-use datafusion::prelude::SessionContext;
-
-use datafusion_proto::from_proto::parse_expr;
-use prost::bytes::BufMut;
-use prost::Message;
-use protobuf::listing_table_scan_node::FileFormatType;
-use protobuf::logical_plan_node::LogicalPlanType;
-use protobuf::LogicalPlanNode;
-use std::convert::TryInto;
-use std::sync::Arc;
-
pub mod from_proto;
-impl AsLogicalPlan for LogicalPlanNode {
- fn try_decode(buf: &[u8]) -> Result<Self, BallistaError>
- where
- Self: Sized,
- {
- LogicalPlanNode::decode(buf).map_err(|e| {
- BallistaError::Internal(format!("failed to decode logical plan: {:?}", e))
- })
- }
-
- fn try_encode<B>(&self, buf: &mut B) -> Result<(), BallistaError>
- where
- B: BufMut,
- Self: Sized,
- {
- self.encode(buf).map_err(|e| {
- BallistaError::Internal(format!("failed to encode logical plan: {:?}", e))
- })
- }
-
- fn try_into_logical_plan(
- &self,
- ctx: &SessionContext,
- extension_codec: &dyn LogicalExtensionCodec,
- ) -> Result<LogicalPlan, BallistaError> {
- let plan = self.logical_plan_type.as_ref().ok_or_else(|| {
- proto_error(format!(
- "logical_plan::from_proto() Unsupported logical plan '{:?}'",
- self
- ))
- })?;
- match plan {
- LogicalPlanType::Values(values) => {
- let n_cols = values.n_cols as usize;
- let values: Vec<Vec<Expr>> = if values.values_list.is_empty() {
- Ok(Vec::new())
- } else if values.values_list.len() % n_cols != 0 {
- Err(BallistaError::General(format!(
- "Invalid values list length, expect {} to be divisible by {}",
- values.values_list.len(),
- n_cols
- )))
- } else {
- values
- .values_list
- .chunks_exact(n_cols)
- .map(|r| {
- r.iter().map(|expr| parse_expr(expr, ctx)).collect::<Result<
- Vec<_>,
- datafusion_proto::from_proto::Error,
- >>(
- )
- })
- .collect::<Result<Vec<_>, _>>()
- .map_err(|e| e.into())
- }?;
- LogicalPlanBuilder::values(values)?
- .build()
- .map_err(|e| e.into())
- }
- LogicalPlanType::Projection(projection) => {
- let input: LogicalPlan =
- into_logical_plan!(projection.input, ctx, extension_codec)?;
- let x: Vec<Expr> = projection
- .expr
- .iter()
- .map(|expr| parse_expr(expr, ctx))
- .collect::<Result<Vec<_>, _>>()?;
- LogicalPlanBuilder::from(input)
- .project_with_alias(
- x,
- projection.optional_alias.as_ref().map(|a| match a {
- protobuf::projection_node::OptionalAlias::Alias(alias) => {
- alias.clone()
- }
- }),
- )?
- .build()
- .map_err(|e| e.into())
- }
- LogicalPlanType::Selection(selection) => {
- let input: LogicalPlan =
- into_logical_plan!(selection.input, ctx, extension_codec)?;
- let expr: Expr = selection
- .expr
- .as_ref()
- .map(|expr| parse_expr(expr, ctx))
- .transpose()?
- .ok_or_else(|| {
- BallistaError::General("expression required".to_string())
- })?;
- // .try_into()?;
- LogicalPlanBuilder::from(input)
- .filter(expr)?
- .build()
- .map_err(|e| e.into())
- }
- LogicalPlanType::Window(window) => {
- let input: LogicalPlan =
- into_logical_plan!(window.input, ctx, extension_codec)?;
- let window_expr = window
- .window_expr
- .iter()
- .map(|expr| parse_expr(expr, ctx))
- .collect::<Result<Vec<Expr>, _>>()?;
- LogicalPlanBuilder::from(input)
- .window(window_expr)?
- .build()
- .map_err(|e| e.into())
- }
- LogicalPlanType::Aggregate(aggregate) => {
- let input: LogicalPlan =
- into_logical_plan!(aggregate.input, ctx, extension_codec)?;
- let group_expr = aggregate
- .group_expr
- .iter()
- .map(|expr| parse_expr(expr, ctx))
- .collect::<Result<Vec<Expr>, _>>()?;
- let aggr_expr = aggregate
- .aggr_expr
- .iter()
- .map(|expr| parse_expr(expr, ctx))
- .collect::<Result<Vec<Expr>, _>>()?;
- LogicalPlanBuilder::from(input)
- .aggregate(group_expr, aggr_expr)?
- .build()
- .map_err(|e| e.into())
- }
- LogicalPlanType::ListingScan(scan) => {
- let schema: Schema = convert_required!(scan.schema)?;
-
- let mut projection = None;
- if let Some(columns) = &scan.projection {
- let column_indices = columns
- .columns
- .iter()
- .map(|name| schema.index_of(name))
- .collect::<Result<Vec<usize>, _>>()?;
- projection = Some(column_indices);
- }
-
- let filters = scan
- .filters
- .iter()
- .map(|expr| parse_expr(expr, ctx))
- .collect::<Result<Vec<_>, _>>()?;
-
- let file_format: Arc<dyn FileFormat> =
- match scan.file_format_type.as_ref().ok_or_else(|| {
- proto_error(format!(
- "logical_plan::from_proto() Unsupported file format '{:?}'",
- self
- ))
- })? {
- &FileFormatType::Parquet(protobuf::ParquetFormat {
- enable_pruning,
- }) => Arc::new(
- ParquetFormat::default().with_enable_pruning(enable_pruning),
- ),
- FileFormatType::Csv(protobuf::CsvFormat {
- has_header,
- delimiter,
- }) => Arc::new(
- CsvFormat::default()
- .with_has_header(*has_header)
- .with_delimiter(str_to_byte(delimiter)?),
- ),
- FileFormatType::Avro(..) => Arc::new(AvroFormat::default()),
- };
-
- let url = ListingTableUrl::parse(&scan.path)?;
- let options = ListingOptions {
- file_extension: scan.file_extension.clone(),
- format: file_format,
- table_partition_cols: scan.table_partition_cols.clone(),
- collect_stat: scan.collect_stat,
- target_partitions: scan.target_partitions as usize,
- };
-
- let config = ListingTableConfig::new(url)
- .with_listing_options(options)
- .with_schema(Arc::new(schema));
-
- let provider = ListingTable::try_new(config)?;
-
- LogicalPlanBuilder::scan_with_filters(
- &scan.table_name,
- provider_as_source(Arc::new(provider)),
- projection,
- filters,
- )?
- .build()
- .map_err(|e| e.into())
- }
- LogicalPlanType::Sort(sort) => {
- let input: LogicalPlan =
- into_logical_plan!(sort.input, ctx, extension_codec)?;
- let sort_expr: Vec<Expr> = sort
- .expr
- .iter()
- .map(|expr| parse_expr(expr, ctx))
- .collect::<Result<Vec<Expr>, _>>()?;
- LogicalPlanBuilder::from(input)
- .sort(sort_expr)?
- .build()
- .map_err(|e| e.into())
- }
- LogicalPlanType::Repartition(repartition) => {
- use datafusion::logical_plan::Partitioning;
- let input: LogicalPlan =
- into_logical_plan!(repartition.input, ctx, extension_codec)?;
- use protobuf::repartition_node::PartitionMethod;
- let pb_partition_method = repartition.partition_method.clone().ok_or_else(|| {
- BallistaError::General(String::from(
- "Protobuf deserialization error, RepartitionNode was missing required field 'partition_method'",
- ))
- })?;
-
- let partitioning_scheme = match pb_partition_method {
- PartitionMethod::Hash(protobuf::HashRepartition {
- hash_expr: pb_hash_expr,
- partition_count,
- }) => Partitioning::Hash(
- pb_hash_expr
- .iter()
- .map(|expr| parse_expr(expr, ctx))
- .collect::<Result<Vec<_>, _>>()?,
- partition_count as usize,
- ),
- PartitionMethod::RoundRobin(partition_count) => {
- Partitioning::RoundRobinBatch(partition_count as usize)
- }
- };
-
- LogicalPlanBuilder::from(input)
- .repartition(partitioning_scheme)?
- .build()
- .map_err(|e| e.into())
- }
- LogicalPlanType::EmptyRelation(empty_relation) => {
- LogicalPlanBuilder::empty(empty_relation.produce_one_row)
- .build()
- .map_err(|e| e.into())
- }
- LogicalPlanType::CreateExternalTable(create_extern_table) => {
- let pb_schema = (create_extern_table.schema.clone()).ok_or_else(|| {
- BallistaError::General(String::from(
- "Protobuf deserialization error, CreateExternalTableNode was missing required field schema.",
- ))
- })?;
-
- let pb_file_type: protobuf::FileType =
- create_extern_table.file_type.try_into()?;
-
- Ok(LogicalPlan::CreateExternalTable(CreateExternalTable {
- schema: pb_schema.try_into()?,
- name: create_extern_table.name.clone(),
- location: create_extern_table.location.clone(),
- file_type: pb_file_type.into(),
- has_header: create_extern_table.has_header,
- delimiter: create_extern_table.delimiter.chars().next().ok_or_else(|| {
- BallistaError::General(String::from("Protobuf deserialization error, unable to parse CSV delimiter"))
- })?,
- table_partition_cols: create_extern_table
- .table_partition_cols
- .clone(),
- if_not_exists: create_extern_table.if_not_exists,
- }))
- }
- LogicalPlanType::CreateView(create_view) => {
- let plan = create_view
- .input.clone().ok_or_else(|| BallistaError::General(String::from(
- "Protobuf deserialization error, CreateViewNode has invalid LogicalPlan input.",
- )))?
- .try_into_logical_plan(ctx, extension_codec)?;
-
- Ok(LogicalPlan::CreateView(CreateView {
- name: create_view.name.clone(),
- input: Arc::new(plan),
- or_replace: create_view.or_replace,
- }))
- }
- LogicalPlanType::CreateCatalogSchema(create_catalog_schema) => {
- let pb_schema = (create_catalog_schema.schema.clone()).ok_or_else(|| {
- BallistaError::General(String::from(
- "Protobuf deserialization error, CreateCatalogSchemaNode was missing required field schema.",
- ))
- })?;
-
- Ok(LogicalPlan::CreateCatalogSchema(CreateCatalogSchema {
- schema_name: create_catalog_schema.schema_name.clone(),
- if_not_exists: create_catalog_schema.if_not_exists,
- schema: pb_schema.try_into()?,
- }))
- }
- LogicalPlanType::CreateCatalog(create_catalog) => {
- let pb_schema = (create_catalog.schema.clone()).ok_or_else(|| {
- BallistaError::General(String::from(
- "Protobuf deserialization error, CreateCatalogNode was missing required field schema.",
- ))
- })?;
-
- Ok(LogicalPlan::CreateCatalog(CreateCatalog {
- catalog_name: create_catalog.catalog_name.clone(),
- if_not_exists: create_catalog.if_not_exists,
- schema: pb_schema.try_into()?,
- }))
- }
- LogicalPlanType::Analyze(analyze) => {
- let input: LogicalPlan =
- into_logical_plan!(analyze.input, ctx, extension_codec)?;
- LogicalPlanBuilder::from(input)
- .explain(analyze.verbose, true)?
- .build()
- .map_err(|e| e.into())
- }
- LogicalPlanType::Explain(explain) => {
- let input: LogicalPlan =
- into_logical_plan!(explain.input, ctx, extension_codec)?;
- LogicalPlanBuilder::from(input)
- .explain(explain.verbose, false)?
- .build()
- .map_err(|e| e.into())
- }
- LogicalPlanType::SubqueryAlias(aliased_relation) => {
- let input: LogicalPlan =
- into_logical_plan!(aliased_relation.input, ctx, extension_codec)?;
- LogicalPlanBuilder::from(input)
- .alias(&aliased_relation.alias)?
- .build()
- .map_err(|e| e.into())
- }
- LogicalPlanType::Limit(limit) => {
- let input: LogicalPlan =
- into_logical_plan!(limit.input, ctx, extension_codec)?;
- LogicalPlanBuilder::from(input)
- .limit(limit.limit as usize)?
- .build()
- .map_err(|e| e.into())
- }
- LogicalPlanType::Offset(offset) => {
- let input: LogicalPlan =
- into_logical_plan!(offset.input, ctx, extension_codec)?;
- LogicalPlanBuilder::from(input)
- .offset(offset.offset as usize)?
- .build()
- .map_err(|e| e.into())
- }
- LogicalPlanType::Join(join) => {
- let left_keys: Vec<Column> =
- join.left_join_column.iter().map(|i| i.into()).collect();
- let right_keys: Vec<Column> =
- join.right_join_column.iter().map(|i| i.into()).collect();
- let join_type =
- protobuf::JoinType::from_i32(join.join_type).ok_or_else(|| {
- proto_error(format!(
- "Received a JoinNode message with unknown JoinType {}",
- join.join_type
- ))
- })?;
- let join_constraint = protobuf::JoinConstraint::from_i32(
- join.join_constraint,
- )
- .ok_or_else(|| {
- proto_error(format!(
- "Received a JoinNode message with unknown JoinConstraint {}",
- join.join_constraint
- ))
- })?;
- let filter: Option<Expr> = join
- .filter
- .as_ref()
- .map(|expr| parse_expr(expr, ctx))
- .map_or(Ok(None), |v| v.map(Some))?;
-
- let builder = LogicalPlanBuilder::from(into_logical_plan!(
- join.left,
- ctx,
- extension_codec
- )?);
- let builder = match join_constraint.into() {
- JoinConstraint::On => builder.join(
- &into_logical_plan!(join.right, ctx, extension_codec)?,
- join_type.into(),
- (left_keys, right_keys),
- filter,
- )?,
- JoinConstraint::Using => builder.join_using(
- &into_logical_plan!(join.right, ctx, extension_codec)?,
- join_type.into(),
- left_keys,
- )?,
- };
-
- builder.build().map_err(|e| e.into())
- }
- LogicalPlanType::Union(union) => {
- let mut input_plans: Vec<LogicalPlan> = union
- .inputs
- .iter()
- .map(|i| i.try_into_logical_plan(ctx, extension_codec))
- .collect::<Result<_, BallistaError>>()?;
-
- if input_plans.len() < 2 {
- return Err( BallistaError::General(String::from(
- "Protobuf deserialization error, Union was require at least two input.",
- )));
- }
-
- let mut builder = LogicalPlanBuilder::from(input_plans.pop().unwrap());
- for plan in input_plans {
- builder = builder.union(plan)?;
- }
- builder.build().map_err(|e| e.into())
- }
- LogicalPlanType::CrossJoin(crossjoin) => {
- let left = into_logical_plan!(crossjoin.left, ctx, extension_codec)?;
- let right = into_logical_plan!(crossjoin.right, ctx, extension_codec)?;
-
- LogicalPlanBuilder::from(left)
- .cross_join(&right)?
- .build()
- .map_err(|e| e.into())
- }
- LogicalPlanType::Extension(LogicalExtensionNode { node, inputs }) => {
- let input_plans: Vec<LogicalPlan> = inputs
- .iter()
- .map(|i| i.try_into_logical_plan(ctx, extension_codec))
- .collect::<Result<_, BallistaError>>()?;
-
- let extension_node =
- extension_codec.try_decode(node, &input_plans, ctx)?;
- Ok(LogicalPlan::Extension(extension_node))
- }
- }
- }
-
- fn try_from_logical_plan(
- plan: &LogicalPlan,
- extension_codec: &dyn LogicalExtensionCodec,
- ) -> Result<Self, BallistaError>
- where
- Self: Sized,
- {
- match plan {
- LogicalPlan::Values(Values { values, .. }) => {
- let n_cols = if values.is_empty() {
- 0
- } else {
- values[0].len()
- } as u64;
- let values_list = values
- .iter()
- .flatten()
- .map(|v| v.try_into())
- .collect::<Result<Vec<_>, _>>()?;
- Ok(protobuf::LogicalPlanNode {
- logical_plan_type: Some(LogicalPlanType::Values(
- protobuf::ValuesNode {
- n_cols,
- values_list,
- },
- )),
- })
- }
- LogicalPlan::TableScan(TableScan {
- table_name,
- source,
- filters,
- projection,
- ..
- }) => {
- let source = source_as_provider(source)?;
- let schema = source.schema();
- let source = source.as_any();
-
- let projection = match projection {
- None => None,
- Some(columns) => {
- let column_names = columns
- .iter()
- .map(|i| schema.field(*i).name().to_owned())
- .collect();
- Some(protobuf::ProjectionColumns {
- columns: column_names,
- })
- }
- };
- let schema: datafusion_proto::protobuf::Schema = schema.as_ref().into();
-
- let filters: Vec<datafusion_proto::protobuf::LogicalExprNode> = filters
- .iter()
- .map(|filter| filter.try_into())
- .collect::<Result<Vec<_>, _>>()?;
-
- if let Some(listing_table) = source.downcast_ref::<ListingTable>() {
- let any = listing_table.options().format.as_any();
- let file_format_type = if let Some(parquet) =
- any.downcast_ref::<ParquetFormat>()
- {
- FileFormatType::Parquet(protobuf::ParquetFormat {
- enable_pruning: parquet.enable_pruning(),
- })
- } else if let Some(csv) = any.downcast_ref::<CsvFormat>() {
- FileFormatType::Csv(protobuf::CsvFormat {
- delimiter: byte_to_string(csv.delimiter())?,
- has_header: csv.has_header(),
- })
- } else if any.is::<AvroFormat>() {
- FileFormatType::Avro(protobuf::AvroFormat {})
- } else {
- return Err(proto_error(format!(
- "Error converting file format, {:?} is invalid as a datafusion foramt.",
- listing_table.options().format
- )));
- };
- Ok(protobuf::LogicalPlanNode {
- logical_plan_type: Some(LogicalPlanType::ListingScan(
- protobuf::ListingTableScanNode {
- file_format_type: Some(file_format_type),
- table_name: table_name.to_owned(),
- collect_stat: listing_table.options().collect_stat,
- file_extension: listing_table
- .options()
- .file_extension
- .clone(),
- table_partition_cols: listing_table
- .options()
- .table_partition_cols
- .clone(),
- path: listing_table.table_path().to_string(),
- schema: Some(schema),
- projection,
- filters,
- target_partitions: listing_table
- .options()
- .target_partitions
- as u32,
- },
- )),
- })
- } else {
- Err(BallistaError::General(format!(
- "logical plan to_proto unsupported table provider {:?}",
- source
- )))
- }
- }
- LogicalPlan::Projection(Projection {
- expr, input, alias, ..
- }) => Ok(protobuf::LogicalPlanNode {
- logical_plan_type: Some(LogicalPlanType::Projection(Box::new(
- protobuf::ProjectionNode {
- input: Some(Box::new(
- protobuf::LogicalPlanNode::try_from_logical_plan(
- input.as_ref(),
- extension_codec,
- )?,
- )),
- expr: expr.iter().map(|expr| expr.try_into()).collect::<Result<
- Vec<_>,
- datafusion_proto::to_proto::Error,
- >>(
- )?,
- optional_alias: alias
- .clone()
- .map(protobuf::projection_node::OptionalAlias::Alias),
- },
- ))),
- }),
- LogicalPlan::Filter(Filter { predicate, input }) => {
- let input: protobuf::LogicalPlanNode =
- protobuf::LogicalPlanNode::try_from_logical_plan(
- input.as_ref(),
- extension_codec,
- )?;
- Ok(protobuf::LogicalPlanNode {
- logical_plan_type: Some(LogicalPlanType::Selection(Box::new(
- protobuf::SelectionNode {
- input: Some(Box::new(input)),
- expr: Some(predicate.try_into()?),
- },
- ))),
- })
- }
- LogicalPlan::Window(Window {
- input, window_expr, ..
- }) => {
- let input: protobuf::LogicalPlanNode =
- protobuf::LogicalPlanNode::try_from_logical_plan(
- input.as_ref(),
- extension_codec,
- )?;
- Ok(protobuf::LogicalPlanNode {
- logical_plan_type: Some(LogicalPlanType::Window(Box::new(
- protobuf::WindowNode {
- input: Some(Box::new(input)),
- window_expr: window_expr
- .iter()
- .map(|expr| expr.try_into())
- .collect::<Result<Vec<_>, _>>()?,
- },
- ))),
- })
- }
- LogicalPlan::Aggregate(Aggregate {
- group_expr,
- aggr_expr,
- input,
- ..
- }) => {
- let input: protobuf::LogicalPlanNode =
- protobuf::LogicalPlanNode::try_from_logical_plan(
- input.as_ref(),
- extension_codec,
- )?;
- Ok(protobuf::LogicalPlanNode {
- logical_plan_type: Some(LogicalPlanType::Aggregate(Box::new(
- protobuf::AggregateNode {
- input: Some(Box::new(input)),
- group_expr: group_expr
- .iter()
- .map(|expr| expr.try_into())
- .collect::<Result<Vec<_>, _>>()?,
- aggr_expr: aggr_expr
- .iter()
- .map(|expr| expr.try_into())
- .collect::<Result<Vec<_>, _>>()?,
- },
- ))),
- })
- }
- LogicalPlan::Join(Join {
- left,
- right,
- on,
- filter,
- join_type,
- join_constraint,
- null_equals_null,
- ..
- }) => {
- let left: protobuf::LogicalPlanNode =
- protobuf::LogicalPlanNode::try_from_logical_plan(
- left.as_ref(),
- extension_codec,
- )?;
- let right: protobuf::LogicalPlanNode =
- protobuf::LogicalPlanNode::try_from_logical_plan(
- right.as_ref(),
- extension_codec,
- )?;
- let (left_join_column, right_join_column) =
- on.iter().map(|(l, r)| (l.into(), r.into())).unzip();
- let join_type: protobuf::JoinType = join_type.to_owned().into();
- let join_constraint: protobuf::JoinConstraint =
- join_constraint.to_owned().into();
- let filter = filter
- .as_ref()
- .map(|e| e.try_into())
- .map_or(Ok(None), |v| v.map(Some))?;
-
- Ok(protobuf::LogicalPlanNode {
- logical_plan_type: Some(LogicalPlanType::Join(Box::new(
- protobuf::JoinNode {
- left: Some(Box::new(left)),
- right: Some(Box::new(right)),
- join_type: join_type.into(),
- join_constraint: join_constraint.into(),
- left_join_column,
- right_join_column,
- null_equals_null: *null_equals_null,
- filter,
- },
- ))),
- })
- }
- LogicalPlan::Subquery(_) => {
- // note that the ballista and datafusion proto files need refactoring to allow
- // LogicalExprNode to reference a LogicalPlanNode
- // see https://github.com/apache/arrow-datafusion/issues/2338
- Err(BallistaError::NotImplemented(
- "Ballista does not support subqueries".to_string(),
- ))
- }
- LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
- let input: protobuf::LogicalPlanNode =
- protobuf::LogicalPlanNode::try_from_logical_plan(
- input.as_ref(),
- extension_codec,
- )?;
- Ok(protobuf::LogicalPlanNode {
- logical_plan_type: Some(LogicalPlanType::SubqueryAlias(Box::new(
- protobuf::SubqueryAliasNode {
- input: Some(Box::new(input)),
- alias: alias.clone(),
- },
- ))),
- })
- }
- LogicalPlan::Limit(Limit { input, n }) => {
- let input: protobuf::LogicalPlanNode =
- protobuf::LogicalPlanNode::try_from_logical_plan(
- input.as_ref(),
- extension_codec,
- )?;
- Ok(protobuf::LogicalPlanNode {
- logical_plan_type: Some(LogicalPlanType::Limit(Box::new(
- protobuf::LimitNode {
- input: Some(Box::new(input)),
- limit: *n as u32,
- },
- ))),
- })
- }
- LogicalPlan::Offset(Offset { input, offset }) => {
- let input: protobuf::LogicalPlanNode =
- protobuf::LogicalPlanNode::try_from_logical_plan(
- input.as_ref(),
- extension_codec,
- )?;
- Ok(protobuf::LogicalPlanNode {
- logical_plan_type: Some(LogicalPlanType::Offset(Box::new(
- protobuf::OffsetNode {
- input: Some(Box::new(input)),
- offset: *offset as u32,
- },
- ))),
- })
- }
- LogicalPlan::Sort(Sort { input, expr }) => {
- let input: protobuf::LogicalPlanNode =
- protobuf::LogicalPlanNode::try_from_logical_plan(
- input.as_ref(),
- extension_codec,
- )?;
- let selection_expr: Vec<datafusion_proto::protobuf::LogicalExprNode> =
- expr.iter()
- .map(|expr| expr.try_into())
- .collect::<Result<Vec<_>, datafusion_proto::to_proto::Error>>()?;
- Ok(protobuf::LogicalPlanNode {
- logical_plan_type: Some(LogicalPlanType::Sort(Box::new(
- protobuf::SortNode {
- input: Some(Box::new(input)),
- expr: selection_expr,
- },
- ))),
- })
- }
- LogicalPlan::Repartition(Repartition {
- input,
- partitioning_scheme,
- }) => {
- use datafusion::logical_plan::Partitioning;
- let input: protobuf::LogicalPlanNode =
- protobuf::LogicalPlanNode::try_from_logical_plan(
- input.as_ref(),
- extension_codec,
- )?;
-
- // Assumed common usize field was batch size
- // Used u64 to avoid any nastyness involving large values, most data clusters are probably uniformly 64 bits any ways
- use protobuf::repartition_node::PartitionMethod;
-
- let pb_partition_method =
- match partitioning_scheme {
- Partitioning::Hash(exprs, partition_count) => {
- PartitionMethod::Hash(protobuf::HashRepartition {
- hash_expr: exprs
- .iter()
- .map(|expr| expr.try_into())
- .collect::<Result<
- Vec<_>,
- datafusion_proto::to_proto::Error,
- >>()?,
- partition_count: *partition_count as u64,
- })
- }
- Partitioning::RoundRobinBatch(partition_count) => {
- PartitionMethod::RoundRobin(*partition_count as u64)
- }
- };
-
- Ok(protobuf::LogicalPlanNode {
- logical_plan_type: Some(LogicalPlanType::Repartition(Box::new(
- protobuf::RepartitionNode {
- input: Some(Box::new(input)),
- partition_method: Some(pb_partition_method),
- },
- ))),
- })
- }
- LogicalPlan::EmptyRelation(EmptyRelation {
- produce_one_row, ..
- }) => Ok(protobuf::LogicalPlanNode {
- logical_plan_type: Some(LogicalPlanType::EmptyRelation(
- protobuf::EmptyRelationNode {
- produce_one_row: *produce_one_row,
- },
- )),
- }),
- LogicalPlan::CreateExternalTable(CreateExternalTable {
- name,
- location,
- file_type,
- has_header,
- delimiter,
- schema: df_schema,
- table_partition_cols,
- if_not_exists,
- }) => {
- use datafusion::logical_plan::FileType;
-
- let pb_file_type: protobuf::FileType = match file_type {
- FileType::NdJson => protobuf::FileType::NdJson,
- FileType::Parquet => protobuf::FileType::Parquet,
- FileType::CSV => protobuf::FileType::Csv,
- FileType::Avro => protobuf::FileType::Avro,
- };
-
- Ok(protobuf::LogicalPlanNode {
- logical_plan_type: Some(LogicalPlanType::CreateExternalTable(
- protobuf::CreateExternalTableNode {
- name: name.clone(),
- location: location.clone(),
- file_type: pb_file_type as i32,
- has_header: *has_header,
- schema: Some(df_schema.into()),
- table_partition_cols: table_partition_cols.clone(),
- if_not_exists: *if_not_exists,
- delimiter: String::from(*delimiter),
- },
- )),
- })
- }
- LogicalPlan::CreateView(CreateView {
- name,
- input,
- or_replace,
- }) => Ok(protobuf::LogicalPlanNode {
- logical_plan_type: Some(LogicalPlanType::CreateView(Box::new(
- protobuf::CreateViewNode {
- name: name.clone(),
- input: Some(Box::new(LogicalPlanNode::try_from_logical_plan(
- input,
- extension_codec,
- )?)),
- or_replace: *or_replace,
- },
- ))),
- }),
- LogicalPlan::CreateCatalogSchema(CreateCatalogSchema {
- schema_name,
- if_not_exists,
- schema: df_schema,
- }) => Ok(protobuf::LogicalPlanNode {
- logical_plan_type: Some(LogicalPlanType::CreateCatalogSchema(
- protobuf::CreateCatalogSchemaNode {
- schema_name: schema_name.clone(),
- if_not_exists: *if_not_exists,
- schema: Some(df_schema.into()),
- },
- )),
- }),
- LogicalPlan::CreateCatalog(CreateCatalog {
- catalog_name,
- if_not_exists,
- schema: df_schema,
- }) => Ok(protobuf::LogicalPlanNode {
- logical_plan_type: Some(LogicalPlanType::CreateCatalog(
- protobuf::CreateCatalogNode {
- catalog_name: catalog_name.clone(),
- if_not_exists: *if_not_exists,
- schema: Some(df_schema.into()),
- },
- )),
- }),
- LogicalPlan::Analyze(a) => {
- let input = protobuf::LogicalPlanNode::try_from_logical_plan(
- a.input.as_ref(),
- extension_codec,
- )?;
- Ok(protobuf::LogicalPlanNode {
- logical_plan_type: Some(LogicalPlanType::Analyze(Box::new(
- protobuf::AnalyzeNode {
- input: Some(Box::new(input)),
- verbose: a.verbose,
- },
- ))),
- })
- }
- LogicalPlan::Explain(a) => {
- let input = protobuf::LogicalPlanNode::try_from_logical_plan(
- a.plan.as_ref(),
- extension_codec,
- )?;
- Ok(protobuf::LogicalPlanNode {
- logical_plan_type: Some(LogicalPlanType::Explain(Box::new(
- protobuf::ExplainNode {
- input: Some(Box::new(input)),
- verbose: a.verbose,
- },
- ))),
- })
- }
- LogicalPlan::Union(union) => {
- let inputs: Vec<LogicalPlanNode> = union
- .inputs
- .iter()
- .map(|i| {
- protobuf::LogicalPlanNode::try_from_logical_plan(
- i,
- extension_codec,
- )
- })
- .collect::<Result<_, BallistaError>>()?;
- Ok(protobuf::LogicalPlanNode {
- logical_plan_type: Some(LogicalPlanType::Union(
- protobuf::UnionNode { inputs },
- )),
- })
- }
- LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
- let left = protobuf::LogicalPlanNode::try_from_logical_plan(
- left.as_ref(),
- extension_codec,
- )?;
- let right = protobuf::LogicalPlanNode::try_from_logical_plan(
- right.as_ref(),
- extension_codec,
- )?;
- Ok(protobuf::LogicalPlanNode {
- logical_plan_type: Some(LogicalPlanType::CrossJoin(Box::new(
- protobuf::CrossJoinNode {
- left: Some(Box::new(left)),
- right: Some(Box::new(right)),
- },
- ))),
- })
- }
- LogicalPlan::Extension(extension) => {
- let mut buf: Vec<u8> = vec![];
- extension_codec.try_encode(extension, &mut buf)?;
-
- let inputs: Vec<LogicalPlanNode> = extension
- .node
- .inputs()
- .iter()
- .map(|i| {
- protobuf::LogicalPlanNode::try_from_logical_plan(
- i,
- extension_codec,
- )
- })
- .collect::<Result<_, BallistaError>>()?;
-
- Ok(protobuf::LogicalPlanNode {
- logical_plan_type: Some(LogicalPlanType::Extension(
- LogicalExtensionNode { node: buf, inputs },
- )),
- })
- }
- LogicalPlan::CreateMemoryTable(_) => Err(proto_error(
- "Error converting CreateMemoryTable. Not yet supported in Ballista",
- )),
- LogicalPlan::DropTable(_) => Err(proto_error(
- "Error converting DropTable. Not yet supported in Ballista",
- )),
- }
- }
-}
-
#[macro_export]
macro_rules! into_logical_plan {
($PB:expr, $CTX:expr, $CODEC:expr) => {{
@@ -1116,11 +110,11 @@ mod roundtrip_tests {
($initial_struct:ident) => {
let ctx = SessionContext::new();
let codec: BallistaCodec<
- protobuf::LogicalPlanNode,
+ datafusion_proto::protobuf::LogicalPlanNode,
protobuf::PhysicalPlanNode,
> = BallistaCodec::default();
- let proto: protobuf::LogicalPlanNode =
- protobuf::LogicalPlanNode::try_from_logical_plan(
+ let proto: datafusion_proto::protobuf::LogicalPlanNode =
+ datafusion_proto::protobuf::LogicalPlanNode::try_from_logical_plan(
&$initial_struct,
codec.logical_extension_codec(),
)
@@ -1139,7 +133,7 @@ mod roundtrip_tests {
protobuf::LogicalPlanNode,
protobuf::PhysicalPlanNode,
> = BallistaCodec::default();
- let proto: protobuf::LogicalPlanNode =
+ let proto: datafusion_proto::protobuf::LogicalPlanNode =
protobuf::LogicalPlanNode::try_from_logical_plan(&$initial_struct)
.expect("from logical plan");
let round_trip: LogicalPlan = proto
@@ -1336,8 +330,10 @@ mod roundtrip_tests {
#[tokio::test]
async fn roundtrip_logical_plan_custom_ctx() -> Result<()> {
let ctx = SessionContext::new();
- let codec: BallistaCodec<protobuf::LogicalPlanNode, protobuf::PhysicalPlanNode> =
- BallistaCodec::default();
+ let codec: BallistaCodec<
+ datafusion_proto::protobuf::LogicalPlanNode,
+ protobuf::PhysicalPlanNode,
+ > = BallistaCodec::default();
let custom_object_store = Arc::new(TestObjectStore {});
ctx.runtime_env()
.register_object_store("test", custom_object_store.clone());
@@ -1357,8 +353,8 @@ mod roundtrip_tests {
.await?
.to_logical_plan()?;
- let proto: protobuf::LogicalPlanNode =
- protobuf::LogicalPlanNode::try_from_logical_plan(
+ let proto: datafusion_proto::protobuf::LogicalPlanNode =
+ datafusion_proto::protobuf::LogicalPlanNode::try_from_logical_plan(
&plan,
codec.logical_extension_codec(),
)
diff --git a/ballista/rust/core/src/serde/mod.rs b/ballista/rust/core/src/serde/mod.rs
index b34b5cb4..a955c099 100644
--- a/ballista/rust/core/src/serde/mod.rs
+++ b/ballista/rust/core/src/serde/mod.rs
@@ -18,24 +18,20 @@
//! This crate contains code generated from the Ballista Protocol Buffer Definition as well
//! as convenience code for interacting with the generated code.
-use prost::bytes::BufMut;
-use std::fmt::Debug;
-use std::marker::PhantomData;
-use std::sync::Arc;
-use std::{convert::TryInto, io::Cursor};
-
-use datafusion::logical_plan::{
- FunctionRegistry, JoinConstraint, JoinType, LogicalPlan, Operator,
-};
-
use crate::{error::BallistaError, serde::scheduler::Action as BallistaAction};
-
use datafusion::execution::runtime_env::RuntimeEnv;
-use datafusion::logical_plan::plan::Extension;
+use datafusion::logical_plan::{FunctionRegistry, JoinConstraint, JoinType, Operator};
use datafusion::physical_plan::join_utils::JoinSide;
use datafusion::physical_plan::ExecutionPlan;
-use datafusion::prelude::SessionContext;
+use datafusion_proto::logical_plan::{
+ AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
+};
+use prost::bytes::BufMut;
use prost::Message;
+use std::fmt::Debug;
+use std::marker::PhantomData;
+use std::sync::Arc;
+use std::{convert::TryInto, io::Cursor};
// include the generated protobuf source as a submodule
#[allow(clippy::all)]
@@ -59,71 +55,6 @@ pub(crate) fn proto_error<S: Into<String>>(message: S) -> BallistaError {
BallistaError::General(message.into())
}
-pub trait AsLogicalPlan: Debug + Send + Sync + Clone {
- fn try_decode(buf: &[u8]) -> Result<Self, BallistaError>
- where
- Self: Sized;
-
- fn try_encode<B>(&self, buf: &mut B) -> Result<(), BallistaError>
- where
- B: BufMut,
- Self: Sized;
-
- fn try_into_logical_plan(
- &self,
- ctx: &SessionContext,
- extension_codec: &dyn LogicalExtensionCodec,
- ) -> Result<LogicalPlan, BallistaError>;
-
- fn try_from_logical_plan(
- plan: &LogicalPlan,
- extension_codec: &dyn LogicalExtensionCodec,
- ) -> Result<Self, BallistaError>
- where
- Self: Sized;
-}
-
-pub trait LogicalExtensionCodec: Debug + Send + Sync {
- fn try_decode(
- &self,
- buf: &[u8],
- inputs: &[LogicalPlan],
- ctx: &SessionContext,
- ) -> Result<Extension, BallistaError>;
-
- fn try_encode(
- &self,
- node: &Extension,
- buf: &mut Vec<u8>,
- ) -> Result<(), BallistaError>;
-}
-
-#[derive(Debug, Clone)]
-pub struct DefaultLogicalExtensionCodec {}
-
-impl LogicalExtensionCodec for DefaultLogicalExtensionCodec {
- fn try_decode(
- &self,
- _buf: &[u8],
- _inputs: &[LogicalPlan],
- _ctx: &SessionContext,
- ) -> Result<Extension, BallistaError> {
- Err(BallistaError::NotImplemented(
- "LogicalExtensionCodec is not provided".to_string(),
- ))
- }
-
- fn try_encode(
- &self,
- _node: &Extension,
- _buf: &mut Vec<u8>,
- ) -> Result<(), BallistaError> {
- Err(BallistaError::NotImplemented(
- "LogicalExtensionCodec is not provided".to_string(),
- ))
- }
-}
-
pub trait AsExecutionPlan: Debug + Send + Sync + Clone {
fn try_decode(buf: &[u8]) -> Result<Self, BallistaError>
where
@@ -412,10 +343,11 @@ mod tests {
}
use crate::error::BallistaError;
- use crate::serde::protobuf::{LogicalPlanNode, PhysicalPlanNode};
+ use crate::serde::protobuf::PhysicalPlanNode;
use crate::serde::{
AsExecutionPlan, AsLogicalPlan, LogicalExtensionCodec, PhysicalExtensionCodec,
};
+ use datafusion_proto::protobuf::LogicalPlanNode;
use proto::{TopKExecProto, TopKPlanProto};
struct TopKPlanNode {
@@ -620,10 +552,10 @@ mod tests {
buf: &[u8],
inputs: &[LogicalPlan],
ctx: &SessionContext,
- ) -> Result<Extension, BallistaError> {
+ ) -> Result<Extension, DataFusionError> {
if let Some((input, _)) = inputs.split_first() {
let proto = TopKPlanProto::decode(buf).map_err(|e| {
- BallistaError::Internal(format!(
+ DataFusionError::Internal(format!(
"failed to decode logical plan: {:?}",
e
))
@@ -640,10 +572,10 @@ mod tests {
node: Arc::new(node),
})
} else {
- Err(BallistaError::from("invalid plan, no expr".to_string()))
+ Err(DataFusionError::Plan("invalid plan, no expr".to_string()))
}
} else {
- Err(BallistaError::from("invalid plan, no input".to_string()))
+ Err(DataFusionError::Plan("invalid plan, no input".to_string()))
}
}
@@ -651,7 +583,7 @@ mod tests {
&self,
node: &Extension,
buf: &mut Vec<u8>,
- ) -> Result<(), BallistaError> {
+ ) -> Result<(), DataFusionError> {
if let Some(exec) = node.node.as_any().downcast_ref::<TopKPlanNode>() {
let proto = TopKPlanProto {
k: exec.k as u64,
@@ -659,7 +591,7 @@ mod tests {
};
proto.encode(buf).map_err(|e| {
- BallistaError::Internal(format!(
+ DataFusionError::Internal(format!(
"failed to encode logical plan: {:?}",
e
))
@@ -667,7 +599,7 @@ mod tests {
Ok(())
} else {
- Err(BallistaError::from("unsupported plan type".to_string()))
+ Err(DataFusionError::Plan("unsupported plan type".to_string()))
}
}
}
diff --git a/ballista/rust/core/src/serde/physical_plan/mod.rs b/ballista/rust/core/src/serde/physical_plan/mod.rs
index a0b12391..cbe8c984 100644
--- a/ballista/rust/core/src/serde/physical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/physical_plan/mod.rs
@@ -1160,8 +1160,9 @@ mod roundtrip_tests {
};
use crate::execution_plans::ShuffleWriterExec;
- use crate::serde::protobuf::{LogicalPlanNode, PhysicalPlanNode};
+ use crate::serde::protobuf::PhysicalPlanNode;
use crate::serde::{AsExecutionPlan, BallistaCodec};
+ use datafusion_proto::protobuf::LogicalPlanNode;
use super::super::super::error::Result;
use super::super::protobuf;
diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index 1418aecb..7493b1ee 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -15,21 +15,12 @@
// specific language governing permissions and limitations
// under the License.
-use std::io::{BufWriter, Write};
-use std::marker::PhantomData;
-
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::Arc;
-use std::{fs::File, pin::Pin};
-
+use crate::config::BallistaConfig;
use crate::error::{BallistaError, Result};
use crate::execution_plans::{
DistributedQueryExec, ShuffleWriterExec, UnresolvedShuffleExec,
};
use crate::serde::scheduler::PartitionStats;
-
-use crate::config::BallistaConfig;
-use crate::serde::{AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec};
use async_trait::async_trait;
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::{ipc::writer::FileWriter, record_batch::RecordBatch};
@@ -37,22 +28,28 @@ use datafusion::error::DataFusionError;
use datafusion::execution::context::{
QueryPlanner, SessionConfig, SessionContext, SessionState,
};
+use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::logical_plan::LogicalPlan;
-
+use datafusion::physical_plan::aggregates::AggregateExec;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::common::batch_byte_size;
use datafusion::physical_plan::empty::EmptyExec;
-
-use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
-use datafusion::physical_plan::aggregates::AggregateExec;
use datafusion::physical_plan::file_format::{CsvExec, ParquetExec};
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::hash_join::HashJoinExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::{metrics, ExecutionPlan, RecordBatchStream};
+use datafusion_proto::logical_plan::{
+ AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
+};
use futures::StreamExt;
+use std::io::{BufWriter, Write};
+use std::marker::PhantomData;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::Arc;
+use std::{fs::File, pin::Pin};
/// Stream data to disk in Arrow IPC format
diff --git a/ballista/rust/executor/Cargo.toml b/ballista/rust/executor/Cargo.toml
index cc3e8f19..439d19d5 100644
--- a/ballista/rust/executor/Cargo.toml
+++ b/ballista/rust/executor/Cargo.toml
@@ -41,6 +41,7 @@ ballista-core = { path = "../core", version = "0.7.0" }
chrono = { version = "0.4", default-features = false }
configure_me = "0.4.0"
datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "3c1c188e1476575f113a511789e398fdd5c009cd" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "3c1c188e1476575f113a511789e398fdd5c009cd" }
env_logger = "0.9"
futures = "0.3"
hyper = "0.14.4"
diff --git a/ballista/rust/executor/src/execution_loop.rs b/ballista/rust/executor/src/execution_loop.rs
index 06128f8d..e1a55a3e 100644
--- a/ballista/rust/executor/src/execution_loop.rs
+++ b/ballista/rust/executor/src/execution_loop.rs
@@ -15,28 +15,26 @@
// specific language governing permissions and limitations
// under the License.
-use std::collections::HashMap;
-use std::ops::Deref;
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::mpsc::{Receiver, Sender, TryRecvError};
-use std::{sync::Arc, time::Duration};
-
-use datafusion::physical_plan::ExecutionPlan;
-use log::{debug, error, info, warn};
-use tonic::transport::Channel;
-
-use ballista_core::serde::protobuf::{
- scheduler_grpc_client::SchedulerGrpcClient, PollWorkParams, PollWorkResult,
- TaskDefinition, TaskStatus,
-};
-
use crate::as_task_status;
use crate::executor::Executor;
use ballista_core::error::BallistaError;
use ballista_core::serde::physical_plan::from_proto::parse_protobuf_hash_partitioning;
+use ballista_core::serde::protobuf::{
+ scheduler_grpc_client::SchedulerGrpcClient, PollWorkParams, PollWorkResult,
+ TaskDefinition, TaskStatus,
+};
use ballista_core::serde::scheduler::ExecutorSpecification;
-use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan, BallistaCodec};
+use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
use datafusion::execution::context::TaskContext;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion_proto::logical_plan::AsLogicalPlan;
+use log::{debug, error, info, warn};
+use std::collections::HashMap;
+use std::ops::Deref;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::mpsc::{Receiver, Sender, TryRecvError};
+use std::{sync::Arc, time::Duration};
+use tonic::transport::Channel;
pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
mut scheduler: SchedulerGrpcClient<Channel>,
diff --git a/ballista/rust/executor/src/executor_server.rs b/ballista/rust/executor/src/executor_server.rs
index 11a5c755..de202a54 100644
--- a/ballista/rust/executor/src/executor_server.rs
+++ b/ballista/rust/executor/src/executor_server.rs
@@ -37,9 +37,10 @@ use ballista_core::serde::protobuf::{
StopExecutorParams, StopExecutorResult, TaskDefinition, UpdateTaskStatusParams,
};
use ballista_core::serde::scheduler::ExecutorState;
-use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan, BallistaCodec};
+use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::ExecutionPlan;
+use datafusion_proto::logical_plan::AsLogicalPlan;
use crate::as_task_status;
use crate::cpu_bound_executor::DedicatedExecutor;
diff --git a/ballista/rust/executor/src/main.rs b/ballista/rust/executor/src/main.rs
index 825ddd4d..f15d625c 100644
--- a/ballista/rust/executor/src/main.rs
+++ b/ballista/rust/executor/src/main.rs
@@ -35,7 +35,7 @@ use ballista_core::config::TaskSchedulingPolicy;
use ballista_core::error::BallistaError;
use ballista_core::serde::protobuf::{
executor_registration, scheduler_grpc_client::SchedulerGrpcClient,
- ExecutorRegistration, LogicalPlanNode, PhysicalPlanNode,
+ ExecutorRegistration, PhysicalPlanNode,
};
use ballista_core::serde::scheduler::ExecutorSpecification;
use ballista_core::serde::BallistaCodec;
@@ -45,6 +45,7 @@ use ballista_executor::flight_service::BallistaFlightService;
use ballista_executor::metrics::LoggingMetricsCollector;
use config::prelude::*;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
+use datafusion_proto::protobuf::LogicalPlanNode;
#[macro_use]
extern crate configure_me;
diff --git a/ballista/rust/executor/src/standalone.rs b/ballista/rust/executor/src/standalone.rs
index d68052af..204ce95f 100644
--- a/ballista/rust/executor/src/standalone.rs
+++ b/ballista/rust/executor/src/standalone.rs
@@ -15,12 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-use std::sync::Arc;
-
+use crate::metrics::LoggingMetricsCollector;
+use crate::{execution_loop, executor::Executor, flight_service::BallistaFlightService};
use arrow_flight::flight_service_server::FlightServiceServer;
-
use ballista_core::serde::scheduler::ExecutorSpecification;
-use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan, BallistaCodec};
+use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
use ballista_core::{
error::Result,
serde::protobuf::executor_registration::OptionalHost,
@@ -28,15 +27,14 @@ use ballista_core::{
BALLISTA_VERSION,
};
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
+use datafusion_proto::logical_plan::AsLogicalPlan;
use log::info;
+use std::sync::Arc;
use tempfile::TempDir;
use tokio::net::TcpListener;
use tonic::transport::{Channel, Server};
use uuid::Uuid;
-use crate::metrics::LoggingMetricsCollector;
-use crate::{execution_loop, executor::Executor, flight_service::BallistaFlightService};
-
pub async fn new_standalone_executor<
T: 'static + AsLogicalPlan,
U: 'static + AsExecutionPlan,
diff --git a/ballista/rust/scheduler/Cargo.toml b/ballista/rust/scheduler/Cargo.toml
index 283bcce5..aa335e61 100644
--- a/ballista/rust/scheduler/Cargo.toml
+++ b/ballista/rust/scheduler/Cargo.toml
@@ -42,6 +42,7 @@ ballista-core = { path = "../core", version = "0.7.0" }
clap = { version = "3", features = ["derive", "cargo"] }
configure_me = "0.4.0"
datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "3c1c188e1476575f113a511789e398fdd5c009cd" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "3c1c188e1476575f113a511789e398fdd5c009cd" }
env_logger = "0.9"
etcd-client = { version = "0.9", optional = true }
futures = "0.3"
diff --git a/ballista/rust/scheduler/src/api/handlers.rs b/ballista/rust/scheduler/src/api/handlers.rs
index 72f17445..b6c322dc 100644
--- a/ballista/rust/scheduler/src/api/handlers.rs
+++ b/ballista/rust/scheduler/src/api/handlers.rs
@@ -11,8 +11,9 @@
// limitations under the License.
use crate::scheduler_server::SchedulerServer;
-use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan};
+use ballista_core::serde::AsExecutionPlan;
use ballista_core::BALLISTA_VERSION;
+use datafusion_proto::logical_plan::AsLogicalPlan;
use warp::Rejection;
#[derive(Debug, serde::Serialize)]
diff --git a/ballista/rust/scheduler/src/api/mod.rs b/ballista/rust/scheduler/src/api/mod.rs
index 98b70463..c3eba901 100644
--- a/ballista/rust/scheduler/src/api/mod.rs
+++ b/ballista/rust/scheduler/src/api/mod.rs
@@ -14,7 +14,8 @@ mod handlers;
use crate::scheduler_server::SchedulerServer;
use anyhow::Result;
-use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan};
+use ballista_core::serde::AsExecutionPlan;
+use datafusion_proto::logical_plan::AsLogicalPlan;
use std::{
pin::Pin,
task::{Context as TaskContext, Poll},
diff --git a/ballista/rust/scheduler/src/main.rs b/ballista/rust/scheduler/src/main.rs
index 43048218..f9f3b6ad 100644
--- a/ballista/rust/scheduler/src/main.rs
+++ b/ballista/rust/scheduler/src/main.rs
@@ -30,15 +30,14 @@ use tower::Service;
use ballista_core::BALLISTA_VERSION;
use ballista_core::{
print_version,
- serde::protobuf::{
- scheduler_grpc_server::SchedulerGrpcServer, LogicalPlanNode, PhysicalPlanNode,
- },
+ serde::protobuf::{scheduler_grpc_server::SchedulerGrpcServer, PhysicalPlanNode},
};
use ballista_scheduler::api::{get_routes, EitherBody, Error};
#[cfg(feature = "etcd")]
use ballista_scheduler::state::backend::etcd::EtcdClient;
#[cfg(feature = "sled")]
use ballista_scheduler::state::backend::standalone::StandaloneClient;
+use datafusion_proto::protobuf::LogicalPlanNode;
use ballista_scheduler::scheduler_server::SchedulerServer;
use ballista_scheduler::state::backend::{StateBackend, StateBackendClient};
diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs
index af1ce1c6..33dc3ac0 100644
--- a/ballista/rust/scheduler/src/planner.rs
+++ b/ballista/rust/scheduler/src/planner.rs
@@ -287,7 +287,8 @@ mod test {
use datafusion::prelude::SessionContext;
use std::ops::Deref;
- use ballista_core::serde::protobuf::{LogicalPlanNode, PhysicalPlanNode};
+ use ballista_core::serde::protobuf::PhysicalPlanNode;
+ use datafusion_proto::protobuf::LogicalPlanNode;
use std::sync::Arc;
use uuid::Uuid;
diff --git a/ballista/rust/scheduler/src/scheduler_server/event_loop.rs b/ballista/rust/scheduler/src/scheduler_server/event_loop.rs
index c343743c..5ba830e8 100644
--- a/ballista/rust/scheduler/src/scheduler_server/event_loop.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/event_loop.rs
@@ -22,15 +22,15 @@ use async_trait::async_trait;
use log::{debug, warn};
use crate::scheduler_server::event::SchedulerServerEvent;
+use crate::scheduler_server::ExecutorsClient;
+use crate::state::task_scheduler::TaskScheduler;
+use crate::state::SchedulerState;
use ballista_core::error::{BallistaError, Result};
use ballista_core::event_loop::EventAction;
use ballista_core::serde::protobuf::{LaunchTaskParams, TaskDefinition};
use ballista_core::serde::scheduler::ExecutorDataChange;
-use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan};
-
-use crate::scheduler_server::ExecutorsClient;
-use crate::state::task_scheduler::TaskScheduler;
-use crate::state::SchedulerState;
+use ballista_core::serde::AsExecutionPlan;
+use datafusion_proto::logical_plan::AsLogicalPlan;
pub(crate) struct SchedulerServerEventAction<
T: 'static + AsLogicalPlan,
diff --git a/ballista/rust/scheduler/src/scheduler_server/external_scaler.rs b/ballista/rust/scheduler/src/scheduler_server/external_scaler.rs
index 4b3966df..1b8d42c2 100644
--- a/ballista/rust/scheduler/src/scheduler_server/external_scaler.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/external_scaler.rs
@@ -20,7 +20,8 @@ use crate::scheduler_server::externalscaler::{
GetMetricsResponse, IsActiveResponse, MetricSpec, MetricValue, ScaledObjectRef,
};
use crate::scheduler_server::SchedulerServer;
-use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan};
+use ballista_core::serde::AsExecutionPlan;
+use datafusion_proto::logical_plan::AsLogicalPlan;
use log::debug;
use tonic::{Request, Response};
diff --git a/ballista/rust/scheduler/src/scheduler_server/grpc.rs b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
index 9216155d..01c2058b 100644
--- a/ballista/rust/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
@@ -15,6 +15,11 @@
// specific language governing permissions and limitations
// under the License.
+use crate::scheduler_server::event::QueryStageSchedulerEvent;
+use crate::scheduler_server::{
+ create_datafusion_context, update_datafusion_context, SchedulerServer,
+};
+use crate::state::task_scheduler::TaskScheduler;
use anyhow::Context;
use ballista_core::config::{BallistaConfig, TaskSchedulingPolicy};
use ballista_core::error::BallistaError;
@@ -32,12 +37,13 @@ use ballista_core::serde::protobuf::{
use ballista_core::serde::scheduler::{
ExecutorData, ExecutorDataChange, ExecutorMetadata,
};
-use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan};
+use ballista_core::serde::AsExecutionPlan;
use datafusion::datafusion_data_access::object_store::{
local::LocalFileSystem, ObjectStore,
};
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
+use datafusion_proto::logical_plan::AsLogicalPlan;
use futures::TryStreamExt;
use log::{debug, error, info, trace, warn};
use rand::{distributions::Alphanumeric, thread_rng, Rng};
@@ -48,12 +54,6 @@ use std::time::Instant;
use std::time::{SystemTime, UNIX_EPOCH};
use tonic::{Request, Response, Status};
-use crate::scheduler_server::event::QueryStageSchedulerEvent;
-use crate::scheduler_server::{
- create_datafusion_context, update_datafusion_context, SchedulerServer,
-};
-use crate::state::task_scheduler::TaskScheduler;
-
#[tonic::async_trait]
impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
for SchedulerServer<T, U>
@@ -561,12 +561,13 @@ mod test {
use crate::state::{backend::standalone::StandaloneClient, SchedulerState};
use ballista_core::error::BallistaError;
use ballista_core::serde::protobuf::{
- executor_registration::OptionalHost, ExecutorRegistration, LogicalPlanNode,
- PhysicalPlanNode, PollWorkParams,
+ executor_registration::OptionalHost, ExecutorRegistration, PhysicalPlanNode,
+ PollWorkParams,
};
use ballista_core::serde::scheduler::ExecutorSpecification;
use ballista_core::serde::BallistaCodec;
use datafusion::execution::context::default_session_builder;
+ use datafusion_proto::protobuf::LogicalPlanNode;
use super::{SchedulerGrpc, SchedulerServer};
diff --git a/ballista/rust/scheduler/src/scheduler_server/mod.rs b/ballista/rust/scheduler/src/scheduler_server/mod.rs
index d61f570c..68dede14 100644
--- a/ballista/rust/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/mod.rs
@@ -15,27 +15,25 @@
// specific language governing permissions and limitations
// under the License.
-use std::collections::HashMap;
-use std::sync::Arc;
-use std::time::{SystemTime, UNIX_EPOCH};
-
-use tokio::sync::RwLock;
-use tonic::transport::Channel;
-
+use crate::scheduler_server::event::{QueryStageSchedulerEvent, SchedulerServerEvent};
+use crate::scheduler_server::event_loop::SchedulerServerEventAction;
+use crate::scheduler_server::query_stage_scheduler::QueryStageScheduler;
+use crate::state::backend::StateBackendClient;
+use crate::state::SchedulerState;
use ballista_core::config::{BallistaConfig, TaskSchedulingPolicy};
use ballista_core::error::Result;
use ballista_core::event_loop::EventLoop;
use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient;
use ballista_core::serde::protobuf::TaskStatus;
-use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan, BallistaCodec};
+use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
use datafusion::execution::context::{default_session_builder, SessionState};
use datafusion::prelude::{SessionConfig, SessionContext};
-
-use crate::scheduler_server::event::{QueryStageSchedulerEvent, SchedulerServerEvent};
-use crate::scheduler_server::event_loop::SchedulerServerEventAction;
-use crate::scheduler_server::query_stage_scheduler::QueryStageScheduler;
-use crate::state::backend::StateBackendClient;
-use crate::state::SchedulerState;
+use datafusion_proto::logical_plan::AsLogicalPlan;
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::{SystemTime, UNIX_EPOCH};
+use tokio::sync::RwLock;
+use tonic::transport::Channel;
// include the generated protobuf source as a submodule
#[allow(clippy::all)]
@@ -288,8 +286,7 @@ mod test {
use ballista_core::error::{BallistaError, Result};
use ballista_core::execution_plans::ShuffleWriterExec;
use ballista_core::serde::protobuf::{
- job_status, task_status, CompletedTask, LogicalPlanNode, PartitionId,
- PhysicalPlanNode, TaskStatus,
+ job_status, task_status, CompletedTask, PartitionId, PhysicalPlanNode, TaskStatus,
};
use ballista_core::serde::scheduler::ExecutorData;
use ballista_core::serde::BallistaCodec;
@@ -298,6 +295,7 @@ mod test {
use datafusion::logical_plan::{col, sum, LogicalPlan};
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion::test_util::scan_empty;
+ use datafusion_proto::protobuf::LogicalPlanNode;
use crate::scheduler_server::event::QueryStageSchedulerEvent;
use crate::scheduler_server::SchedulerServer;
diff --git a/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs b/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
index 58e9f8f3..fe1c2d5d 100644
--- a/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -15,15 +15,14 @@
// specific language governing permissions and limitations
// under the License.
-use std::collections::{HashMap, HashSet};
-use std::sync::Arc;
-
+use crate::planner::{
+ find_unresolved_shuffles, remove_unresolved_shuffles, DistributedPlanner,
+};
+use crate::scheduler_server::event::{QueryStageSchedulerEvent, SchedulerServerEvent};
+use crate::state::SchedulerState;
use async_recursion::async_recursion;
use async_trait::async_trait;
-use log::{debug, error, info, warn};
-
use ballista_core::error::{BallistaError, Result};
-
use ballista_core::event_loop::{EventAction, EventSender};
use ballista_core::execution_plans::UnresolvedShuffleExec;
use ballista_core::serde::protobuf::{
@@ -31,14 +30,12 @@ use ballista_core::serde::protobuf::{
JobStatus, RunningJob, TaskStatus,
};
use ballista_core::serde::scheduler::{ExecutorMetadata, PartitionStats};
-use ballista_core::serde::{protobuf, AsExecutionPlan, AsLogicalPlan};
+use ballista_core::serde::{protobuf, AsExecutionPlan};
use datafusion::physical_plan::ExecutionPlan;
-
-use crate::planner::{
- find_unresolved_shuffles, remove_unresolved_shuffles, DistributedPlanner,
-};
-use crate::scheduler_server::event::{QueryStageSchedulerEvent, SchedulerServerEvent};
-use crate::state::SchedulerState;
+use datafusion_proto::logical_plan::AsLogicalPlan;
+use log::{debug, error, info, warn};
+use std::collections::{HashMap, HashSet};
+use std::sync::Arc;
pub(crate) struct QueryStageScheduler<
T: 'static + AsLogicalPlan,
diff --git a/ballista/rust/scheduler/src/standalone.rs b/ballista/rust/scheduler/src/standalone.rs
index f333587e..4f358a46 100644
--- a/ballista/rust/scheduler/src/standalone.rs
+++ b/ballista/rust/scheduler/src/standalone.rs
@@ -15,12 +15,13 @@
// specific language governing permissions and limitations
// under the License.
-use ballista_core::serde::protobuf::{LogicalPlanNode, PhysicalPlanNode};
+use ballista_core::serde::protobuf::PhysicalPlanNode;
use ballista_core::serde::BallistaCodec;
use ballista_core::{
error::Result, serde::protobuf::scheduler_grpc_server::SchedulerGrpcServer,
BALLISTA_VERSION,
};
+use datafusion_proto::protobuf::LogicalPlanNode;
use log::info;
use std::{net::SocketAddr, sync::Arc};
use tokio::net::TcpListener;
diff --git a/ballista/rust/scheduler/src/state/mod.rs b/ballista/rust/scheduler/src/state/mod.rs
index 5f301346..de780fab 100644
--- a/ballista/rust/scheduler/src/state/mod.rs
+++ b/ballista/rust/scheduler/src/state/mod.rs
@@ -15,23 +15,20 @@
// specific language governing permissions and limitations
// under the License.
-use std::collections::HashMap;
-use std::sync::Arc;
-use std::time::{Duration, SystemTime, UNIX_EPOCH};
-
-use datafusion::physical_plan::ExecutionPlan;
-
-use ballista_core::error::Result;
-
use crate::scheduler_server::{SessionBuilder, SessionContextRegistry};
-use ballista_core::serde::protobuf::{ExecutorHeartbeat, JobStatus, KeyValuePair};
-use ballista_core::serde::scheduler::ExecutorMetadata;
-use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan, BallistaCodec};
-
use crate::state::backend::StateBackendClient;
use crate::state::executor_manager::ExecutorManager;
use crate::state::persistent_state::PersistentSchedulerState;
use crate::state::stage_manager::StageManager;
+use ballista_core::error::Result;
+use ballista_core::serde::protobuf::{ExecutorHeartbeat, JobStatus, KeyValuePair};
+use ballista_core::serde::scheduler::ExecutorMetadata;
+use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion_proto::logical_plan::AsLogicalPlan;
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::{Duration, SystemTime, UNIX_EPOCH};
pub mod backend;
mod executor_manager;
@@ -181,11 +178,12 @@ mod test {
use ballista_core::error::BallistaError;
use ballista_core::serde::protobuf::{
- job_status, JobStatus, LogicalPlanNode, PhysicalPlanNode, QueuedJob,
+ job_status, JobStatus, PhysicalPlanNode, QueuedJob,
};
use ballista_core::serde::scheduler::{ExecutorMetadata, ExecutorSpecification};
use ballista_core::serde::BallistaCodec;
use datafusion::execution::context::default_session_builder;
+ use datafusion_proto::protobuf::LogicalPlanNode;
use super::{backend::standalone::StandaloneClient, SchedulerState};
diff --git a/ballista/rust/scheduler/src/state/persistent_state.rs b/ballista/rust/scheduler/src/state/persistent_state.rs
index 428d523a..e9692482 100644
--- a/ballista/rust/scheduler/src/state/persistent_state.rs
+++ b/ballista/rust/scheduler/src/state/persistent_state.rs
@@ -15,7 +15,18 @@
// specific language governing permissions and limitations
// under the License.
+use crate::scheduler_server::{
+ create_datafusion_context, SessionBuilder, SessionContextRegistry,
+};
+use crate::state::backend::StateBackendClient;
+use crate::state::stage_manager::StageKey;
use ballista_core::config::BallistaConfig;
+use ballista_core::error::{BallistaError, Result};
+use ballista_core::serde::protobuf::{JobSessionConfig, JobStatus, KeyValuePair};
+use ballista_core::serde::scheduler::ExecutorMetadata;
+use ballista_core::serde::{protobuf, AsExecutionPlan, BallistaCodec};
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion_proto::logical_plan::AsLogicalPlan;
use log::{debug, error};
use parking_lot::RwLock;
use prost::Message;
@@ -24,19 +35,6 @@ use std::collections::HashMap;
use std::ops::Deref;
use std::sync::Arc;
-use ballista_core::error::{BallistaError, Result};
-
-use ballista_core::serde::protobuf::{JobSessionConfig, JobStatus, KeyValuePair};
-
-use crate::scheduler_server::{
- create_datafusion_context, SessionBuilder, SessionContextRegistry,
-};
-use crate::state::backend::StateBackendClient;
-use crate::state::stage_manager::StageKey;
-use ballista_core::serde::scheduler::ExecutorMetadata;
-use ballista_core::serde::{protobuf, AsExecutionPlan, AsLogicalPlan, BallistaCodec};
-use datafusion::physical_plan::ExecutionPlan;
-
#[derive(Clone)]
pub(crate) struct PersistentSchedulerState<
T: 'static + AsLogicalPlan,
@@ -408,13 +406,12 @@ mod test {
use crate::state::persistent_state::PersistentSchedulerState;
use ballista_core::serde::protobuf::job_status::Status;
- use ballista_core::serde::protobuf::{
- JobStatus, LogicalPlanNode, PhysicalPlanNode, QueuedJob,
- };
+ use ballista_core::serde::protobuf::{JobStatus, PhysicalPlanNode, QueuedJob};
use ballista_core::serde::BallistaCodec;
use datafusion::execution::context::default_session_builder;
use datafusion::logical_plan::LogicalPlanBuilder;
use datafusion::prelude::SessionContext;
+ use datafusion_proto::protobuf::LogicalPlanNode;
use std::sync::Arc;
diff --git a/ballista/rust/scheduler/src/state/task_scheduler.rs b/ballista/rust/scheduler/src/state/task_scheduler.rs
index 132fbcf1..0b40091e 100644
--- a/ballista/rust/scheduler/src/state/task_scheduler.rs
+++ b/ballista/rust/scheduler/src/state/task_scheduler.rs
@@ -26,7 +26,8 @@ use ballista_core::serde::protobuf::{
};
use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto;
use ballista_core::serde::scheduler::{ExecutorData, PartitionId};
-use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan};
+use ballista_core::serde::AsExecutionPlan;
+use datafusion_proto::logical_plan::AsLogicalPlan;
use log::{debug, info};
#[async_trait]
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index a0352ae8..075633da 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -34,6 +34,7 @@ snmalloc = ["snmalloc-rs"]
[dependencies]
ballista = { path = "../ballista/rust/client" }
datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "3c1c188e1476575f113a511789e398fdd5c009cd" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "3c1c188e1476575f113a511789e398fdd5c009cd" }
env_logger = "0.9"
futures = "0.3"
mimalloc = { version = "0.1", optional = true, default-features = false }
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 0c266812..18b1b018 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -17,23 +17,13 @@
//! Benchmark derived from TPC-H. This is not an official TPC-H benchmark.
-use futures::future::join_all;
-use rand::prelude::*;
-use std::ops::Div;
-use std::{
- fs::{self, File},
- io::Write,
- iter::Iterator,
- path::{Path, PathBuf},
- sync::Arc,
- time::{Instant, SystemTime},
-};
-
use ballista::context::BallistaContext;
use ballista::prelude::{
BallistaConfig, BALLISTA_DEFAULT_BATCH_SIZE, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
};
-
+use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
+use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
+use datafusion::datasource::listing::ListingTableUrl;
use datafusion::datasource::{MemTable, TableProvider};
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_plan::LogicalPlan;
@@ -54,11 +44,18 @@ use datafusion::{
arrow::util::pretty,
datasource::listing::{ListingOptions, ListingTable, ListingTableConfig},
};
-
-use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
-use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
-use datafusion::datasource::listing::ListingTableUrl;
+use futures::future::join_all;
+use rand::prelude::*;
use serde::Serialize;
+use std::ops::Div;
+use std::{
+ fs::{self, File},
+ io::Write,
+ iter::Iterator,
+ path::{Path, PathBuf},
+ sync::Arc,
+ time::{Instant, SystemTime},
+};
use structopt::StructOpt;
#[cfg(feature = "snmalloc")]
@@ -921,13 +918,12 @@ struct QueryResult {
#[cfg(test)]
mod tests {
use super::*;
- use std::env;
- use std::sync::Arc;
-
use datafusion::arrow::array::*;
use datafusion::arrow::util::display::array_value_to_string;
use datafusion::logical_plan::Expr;
use datafusion::logical_plan::Expr::Cast;
+ use std::env;
+ use std::sync::Arc;
#[tokio::test]
async fn q1() -> Result<()> {
@@ -1464,11 +1460,10 @@ mod tests {
mod ballista_round_trip {
use super::*;
- use ballista_core::serde::{
- protobuf, AsExecutionPlan, AsLogicalPlan, BallistaCodec,
- };
+ use ballista_core::serde::{protobuf, AsExecutionPlan, BallistaCodec};
use datafusion::datasource::listing::ListingTableUrl;
use datafusion::physical_plan::ExecutionPlan;
+ use datafusion_proto::logical_plan::AsLogicalPlan;
use std::ops::Deref;
async fn round_trip_query(n: usize) -> Result<()> {
@@ -1477,7 +1472,7 @@ mod tests {
.with_batch_size(10);
let ctx = SessionContext::with_config(config);
let codec: BallistaCodec<
- protobuf::LogicalPlanNode,
+ datafusion_proto::protobuf::LogicalPlanNode,
protobuf::PhysicalPlanNode,
> = BallistaCodec::default();
@@ -1505,8 +1500,8 @@ mod tests {
// test logical plan round trip
let plans = create_logical_plans(&ctx, n)?;
for plan in plans {
- let proto: protobuf::LogicalPlanNode =
- protobuf::LogicalPlanNode::try_from_logical_plan(
+ let proto: datafusion_proto::protobuf::LogicalPlanNode =
+ datafusion_proto::protobuf::LogicalPlanNode::try_from_logical_plan(
&plan,
codec.logical_extension_codec(),
)
@@ -1522,8 +1517,8 @@ mod tests {
// test optimized logical plan round trip
let plan = ctx.optimize(&plan)?;
- let proto: protobuf::LogicalPlanNode =
- protobuf::LogicalPlanNode::try_from_logical_plan(
+ let proto: datafusion_proto::protobuf::LogicalPlanNode =
+ datafusion_proto::protobuf::LogicalPlanNode::try_from_logical_plan(
&plan,
codec.logical_extension_codec(),
)