You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/06/04 15:18:16 UTC

[arrow-ballista] branch master updated: Delegate to `datafusion-proto` for logical plan serde (#57)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b1394e9 Delegate to `datafusion-proto` for logical plan serde (#57)
9b1394e9 is described below

commit 9b1394e9748b99416f9c760e0e0cfdfbeddb5b99
Author: Andy Grove <an...@nvidia.com>
AuthorDate: Sat Jun 4 09:18:12 2022 -0600

    Delegate to `datafusion-proto` for logical plan serde (#57)
---
 ballista/rust/client/Cargo.toml                    |    2 +-
 ballista/rust/client/src/context.rs                |    3 +-
 .../core/src/execution_plans/distributed_query.rs  |   29 +-
 ballista/rust/core/src/serde/logical_plan/mod.rs   | 1024 +-------------------
 ballista/rust/core/src/serde/mod.rs                |  104 +-
 ballista/rust/core/src/serde/physical_plan/mod.rs  |    3 +-
 ballista/rust/core/src/utils.rs                    |   25 +-
 ballista/rust/executor/Cargo.toml                  |    1 +
 ballista/rust/executor/src/execution_loop.rs       |   30 +-
 ballista/rust/executor/src/executor_server.rs      |    3 +-
 ballista/rust/executor/src/main.rs                 |    3 +-
 ballista/rust/executor/src/standalone.rs           |   12 +-
 ballista/rust/scheduler/Cargo.toml                 |    1 +
 ballista/rust/scheduler/src/api/handlers.rs        |    3 +-
 ballista/rust/scheduler/src/api/mod.rs             |    3 +-
 ballista/rust/scheduler/src/main.rs                |    5 +-
 ballista/rust/scheduler/src/planner.rs             |    3 +-
 .../scheduler/src/scheduler_server/event_loop.rs   |   10 +-
 .../src/scheduler_server/external_scaler.rs        |    3 +-
 .../rust/scheduler/src/scheduler_server/grpc.rs    |   19 +-
 .../rust/scheduler/src/scheduler_server/mod.rs     |   30 +-
 .../src/scheduler_server/query_stage_scheduler.rs  |   23 +-
 ballista/rust/scheduler/src/standalone.rs          |    3 +-
 ballista/rust/scheduler/src/state/mod.rs           |   24 +-
 .../rust/scheduler/src/state/persistent_state.rs   |   29 +-
 .../rust/scheduler/src/state/task_scheduler.rs     |    3 +-
 benchmarks/Cargo.toml                              |    1 +
 benchmarks/src/bin/tpch.rs                         |   51 +-
 28 files changed, 183 insertions(+), 1267 deletions(-)

diff --git a/ballista/rust/client/Cargo.toml b/ballista/rust/client/Cargo.toml
index 65f9a24f..b9138c71 100644
--- a/ballista/rust/client/Cargo.toml
+++ b/ballista/rust/client/Cargo.toml
@@ -31,8 +31,8 @@ rust-version = "1.59"
 ballista-core = { path = "../core", version = "0.7.0" }
 ballista-executor = { path = "../executor", version = "0.7.0", optional = true }
 ballista-scheduler = { path = "../scheduler", version = "0.7.0", optional = true }
-
 datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "3c1c188e1476575f113a511789e398fdd5c009cd" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "3c1c188e1476575f113a511789e398fdd5c009cd" }
 futures = "0.3"
 log = "0.4"
 parking_lot = "0.12"
diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs
index 4209af5e..09ff9cee 100644
--- a/ballista/rust/client/src/context.rs
+++ b/ballista/rust/client/src/context.rs
@@ -27,8 +27,9 @@ use std::sync::Arc;
 
 use ballista_core::config::BallistaConfig;
 use ballista_core::serde::protobuf::scheduler_grpc_client::SchedulerGrpcClient;
-use ballista_core::serde::protobuf::{ExecuteQueryParams, KeyValuePair, LogicalPlanNode};
+use ballista_core::serde::protobuf::{ExecuteQueryParams, KeyValuePair};
 use ballista_core::utils::create_df_ctx_with_ballista_query_planner;
+use datafusion_proto::protobuf::LogicalPlanNode;
 
 use datafusion::catalog::TableReference;
 use datafusion::dataframe::DataFrame;
diff --git a/ballista/rust/core/src/execution_plans/distributed_query.rs b/ballista/rust/core/src/execution_plans/distributed_query.rs
index 0b20acb4..62e7ff02 100644
--- a/ballista/rust/core/src/execution_plans/distributed_query.rs
+++ b/ballista/rust/core/src/execution_plans/distributed_query.rs
@@ -15,38 +15,35 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::any::Any;
-
-use std::fmt::Debug;
-use std::marker::PhantomData;
-
-use std::sync::Arc;
-use std::time::Duration;
-
 use crate::client::BallistaClient;
 use crate::config::BallistaConfig;
+use crate::serde::protobuf::execute_query_params::OptionalSessionId;
 use crate::serde::protobuf::{
     execute_query_params::Query, job_status, scheduler_grpc_client::SchedulerGrpcClient,
     ExecuteQueryParams, GetJobStatusParams, GetJobStatusResult, KeyValuePair,
     PartitionLocation,
 };
-
 use datafusion::arrow::datatypes::SchemaRef;
+use datafusion::arrow::error::{ArrowError, Result as ArrowResult};
+use datafusion::arrow::record_batch::RecordBatch;
 use datafusion::error::{DataFusionError, Result};
+use datafusion::execution::context::TaskContext;
 use datafusion::logical_plan::LogicalPlan;
 use datafusion::physical_plan::expressions::PhysicalSortExpr;
+use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
 use datafusion::physical_plan::{
     DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
 };
-
-use crate::serde::protobuf::execute_query_params::OptionalSessionId;
-use crate::serde::{AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec};
-use datafusion::arrow::error::{ArrowError, Result as ArrowResult};
-use datafusion::arrow::record_batch::RecordBatch;
-use datafusion::execution::context::TaskContext;
-use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
+use datafusion_proto::logical_plan::{
+    AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
+};
 use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt};
 use log::{error, info};
+use std::any::Any;
+use std::fmt::Debug;
+use std::marker::PhantomData;
+use std::sync::Arc;
+use std::time::Duration;
 
 /// This operator sends a logical plan to a Ballista scheduler for execution and
 /// polls the scheduler until the query is complete and then fetches the resulting
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs
index be76e538..c4532711 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -15,1014 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::error::BallistaError;
-use crate::serde::protobuf::LogicalExtensionNode;
-use crate::serde::{
-    byte_to_string, proto_error, protobuf, str_to_byte, AsLogicalPlan,
-    LogicalExtensionCodec,
-};
-use crate::{convert_required, into_logical_plan};
-use datafusion::arrow::datatypes::Schema;
-use datafusion::datasource::file_format::avro::AvroFormat;
-use datafusion::datasource::file_format::csv::CsvFormat;
-use datafusion::datasource::file_format::parquet::ParquetFormat;
-use datafusion::datasource::file_format::FileFormat;
-use datafusion::datasource::listing::{
-    ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
-};
-use datafusion::logical_plan::plan::{
-    Aggregate, EmptyRelation, Filter, Join, Projection, Sort, SubqueryAlias, Window,
-};
-use datafusion::logical_plan::{
-    provider_as_source, source_as_provider, Column, CreateCatalog, CreateCatalogSchema,
-    CreateExternalTable, CreateView, CrossJoin, Expr, JoinConstraint, Limit, LogicalPlan,
-    LogicalPlanBuilder, Offset, Repartition, TableScan, Values,
-};
-use datafusion::prelude::SessionContext;
-
-use datafusion_proto::from_proto::parse_expr;
-use prost::bytes::BufMut;
-use prost::Message;
-use protobuf::listing_table_scan_node::FileFormatType;
-use protobuf::logical_plan_node::LogicalPlanType;
-use protobuf::LogicalPlanNode;
-use std::convert::TryInto;
-use std::sync::Arc;
-
 pub mod from_proto;
 
-impl AsLogicalPlan for LogicalPlanNode {
-    fn try_decode(buf: &[u8]) -> Result<Self, BallistaError>
-    where
-        Self: Sized,
-    {
-        LogicalPlanNode::decode(buf).map_err(|e| {
-            BallistaError::Internal(format!("failed to decode logical plan: {:?}", e))
-        })
-    }
-
-    fn try_encode<B>(&self, buf: &mut B) -> Result<(), BallistaError>
-    where
-        B: BufMut,
-        Self: Sized,
-    {
-        self.encode(buf).map_err(|e| {
-            BallistaError::Internal(format!("failed to encode logical plan: {:?}", e))
-        })
-    }
-
-    fn try_into_logical_plan(
-        &self,
-        ctx: &SessionContext,
-        extension_codec: &dyn LogicalExtensionCodec,
-    ) -> Result<LogicalPlan, BallistaError> {
-        let plan = self.logical_plan_type.as_ref().ok_or_else(|| {
-            proto_error(format!(
-                "logical_plan::from_proto() Unsupported logical plan '{:?}'",
-                self
-            ))
-        })?;
-        match plan {
-            LogicalPlanType::Values(values) => {
-                let n_cols = values.n_cols as usize;
-                let values: Vec<Vec<Expr>> = if values.values_list.is_empty() {
-                    Ok(Vec::new())
-                } else if values.values_list.len() % n_cols != 0 {
-                    Err(BallistaError::General(format!(
-                        "Invalid values list length, expect {} to be divisible by {}",
-                        values.values_list.len(),
-                        n_cols
-                    )))
-                } else {
-                    values
-                        .values_list
-                        .chunks_exact(n_cols)
-                        .map(|r| {
-                            r.iter().map(|expr| parse_expr(expr, ctx)).collect::<Result<
-                                Vec<_>,
-                                datafusion_proto::from_proto::Error,
-                            >>(
-                            )
-                        })
-                        .collect::<Result<Vec<_>, _>>()
-                        .map_err(|e| e.into())
-                }?;
-                LogicalPlanBuilder::values(values)?
-                    .build()
-                    .map_err(|e| e.into())
-            }
-            LogicalPlanType::Projection(projection) => {
-                let input: LogicalPlan =
-                    into_logical_plan!(projection.input, ctx, extension_codec)?;
-                let x: Vec<Expr> = projection
-                    .expr
-                    .iter()
-                    .map(|expr| parse_expr(expr, ctx))
-                    .collect::<Result<Vec<_>, _>>()?;
-                LogicalPlanBuilder::from(input)
-                    .project_with_alias(
-                        x,
-                        projection.optional_alias.as_ref().map(|a| match a {
-                            protobuf::projection_node::OptionalAlias::Alias(alias) => {
-                                alias.clone()
-                            }
-                        }),
-                    )?
-                    .build()
-                    .map_err(|e| e.into())
-            }
-            LogicalPlanType::Selection(selection) => {
-                let input: LogicalPlan =
-                    into_logical_plan!(selection.input, ctx, extension_codec)?;
-                let expr: Expr = selection
-                    .expr
-                    .as_ref()
-                    .map(|expr| parse_expr(expr, ctx))
-                    .transpose()?
-                    .ok_or_else(|| {
-                        BallistaError::General("expression required".to_string())
-                    })?;
-                // .try_into()?;
-                LogicalPlanBuilder::from(input)
-                    .filter(expr)?
-                    .build()
-                    .map_err(|e| e.into())
-            }
-            LogicalPlanType::Window(window) => {
-                let input: LogicalPlan =
-                    into_logical_plan!(window.input, ctx, extension_codec)?;
-                let window_expr = window
-                    .window_expr
-                    .iter()
-                    .map(|expr| parse_expr(expr, ctx))
-                    .collect::<Result<Vec<Expr>, _>>()?;
-                LogicalPlanBuilder::from(input)
-                    .window(window_expr)?
-                    .build()
-                    .map_err(|e| e.into())
-            }
-            LogicalPlanType::Aggregate(aggregate) => {
-                let input: LogicalPlan =
-                    into_logical_plan!(aggregate.input, ctx, extension_codec)?;
-                let group_expr = aggregate
-                    .group_expr
-                    .iter()
-                    .map(|expr| parse_expr(expr, ctx))
-                    .collect::<Result<Vec<Expr>, _>>()?;
-                let aggr_expr = aggregate
-                    .aggr_expr
-                    .iter()
-                    .map(|expr| parse_expr(expr, ctx))
-                    .collect::<Result<Vec<Expr>, _>>()?;
-                LogicalPlanBuilder::from(input)
-                    .aggregate(group_expr, aggr_expr)?
-                    .build()
-                    .map_err(|e| e.into())
-            }
-            LogicalPlanType::ListingScan(scan) => {
-                let schema: Schema = convert_required!(scan.schema)?;
-
-                let mut projection = None;
-                if let Some(columns) = &scan.projection {
-                    let column_indices = columns
-                        .columns
-                        .iter()
-                        .map(|name| schema.index_of(name))
-                        .collect::<Result<Vec<usize>, _>>()?;
-                    projection = Some(column_indices);
-                }
-
-                let filters = scan
-                    .filters
-                    .iter()
-                    .map(|expr| parse_expr(expr, ctx))
-                    .collect::<Result<Vec<_>, _>>()?;
-
-                let file_format: Arc<dyn FileFormat> =
-                    match scan.file_format_type.as_ref().ok_or_else(|| {
-                        proto_error(format!(
-                            "logical_plan::from_proto() Unsupported file format '{:?}'",
-                            self
-                        ))
-                    })? {
-                        &FileFormatType::Parquet(protobuf::ParquetFormat {
-                            enable_pruning,
-                        }) => Arc::new(
-                            ParquetFormat::default().with_enable_pruning(enable_pruning),
-                        ),
-                        FileFormatType::Csv(protobuf::CsvFormat {
-                            has_header,
-                            delimiter,
-                        }) => Arc::new(
-                            CsvFormat::default()
-                                .with_has_header(*has_header)
-                                .with_delimiter(str_to_byte(delimiter)?),
-                        ),
-                        FileFormatType::Avro(..) => Arc::new(AvroFormat::default()),
-                    };
-
-                let url = ListingTableUrl::parse(&scan.path)?;
-                let options = ListingOptions {
-                    file_extension: scan.file_extension.clone(),
-                    format: file_format,
-                    table_partition_cols: scan.table_partition_cols.clone(),
-                    collect_stat: scan.collect_stat,
-                    target_partitions: scan.target_partitions as usize,
-                };
-
-                let config = ListingTableConfig::new(url)
-                    .with_listing_options(options)
-                    .with_schema(Arc::new(schema));
-
-                let provider = ListingTable::try_new(config)?;
-
-                LogicalPlanBuilder::scan_with_filters(
-                    &scan.table_name,
-                    provider_as_source(Arc::new(provider)),
-                    projection,
-                    filters,
-                )?
-                .build()
-                .map_err(|e| e.into())
-            }
-            LogicalPlanType::Sort(sort) => {
-                let input: LogicalPlan =
-                    into_logical_plan!(sort.input, ctx, extension_codec)?;
-                let sort_expr: Vec<Expr> = sort
-                    .expr
-                    .iter()
-                    .map(|expr| parse_expr(expr, ctx))
-                    .collect::<Result<Vec<Expr>, _>>()?;
-                LogicalPlanBuilder::from(input)
-                    .sort(sort_expr)?
-                    .build()
-                    .map_err(|e| e.into())
-            }
-            LogicalPlanType::Repartition(repartition) => {
-                use datafusion::logical_plan::Partitioning;
-                let input: LogicalPlan =
-                    into_logical_plan!(repartition.input, ctx, extension_codec)?;
-                use protobuf::repartition_node::PartitionMethod;
-                let pb_partition_method = repartition.partition_method.clone().ok_or_else(|| {
-                    BallistaError::General(String::from(
-                        "Protobuf deserialization error, RepartitionNode was missing required field 'partition_method'",
-                    ))
-                })?;
-
-                let partitioning_scheme = match pb_partition_method {
-                    PartitionMethod::Hash(protobuf::HashRepartition {
-                        hash_expr: pb_hash_expr,
-                        partition_count,
-                    }) => Partitioning::Hash(
-                        pb_hash_expr
-                            .iter()
-                            .map(|expr| parse_expr(expr, ctx))
-                            .collect::<Result<Vec<_>, _>>()?,
-                        partition_count as usize,
-                    ),
-                    PartitionMethod::RoundRobin(partition_count) => {
-                        Partitioning::RoundRobinBatch(partition_count as usize)
-                    }
-                };
-
-                LogicalPlanBuilder::from(input)
-                    .repartition(partitioning_scheme)?
-                    .build()
-                    .map_err(|e| e.into())
-            }
-            LogicalPlanType::EmptyRelation(empty_relation) => {
-                LogicalPlanBuilder::empty(empty_relation.produce_one_row)
-                    .build()
-                    .map_err(|e| e.into())
-            }
-            LogicalPlanType::CreateExternalTable(create_extern_table) => {
-                let pb_schema = (create_extern_table.schema.clone()).ok_or_else(|| {
-                    BallistaError::General(String::from(
-                        "Protobuf deserialization error, CreateExternalTableNode was missing required field schema.",
-                    ))
-                })?;
-
-                let pb_file_type: protobuf::FileType =
-                    create_extern_table.file_type.try_into()?;
-
-                Ok(LogicalPlan::CreateExternalTable(CreateExternalTable {
-                    schema: pb_schema.try_into()?,
-                    name: create_extern_table.name.clone(),
-                    location: create_extern_table.location.clone(),
-                    file_type: pb_file_type.into(),
-                    has_header: create_extern_table.has_header,
-                    delimiter: create_extern_table.delimiter.chars().next().ok_or_else(|| {
-                        BallistaError::General(String::from("Protobuf deserialization error, unable to parse CSV delimiter"))
-                    })?,
-                    table_partition_cols: create_extern_table
-                        .table_partition_cols
-                        .clone(),
-                    if_not_exists: create_extern_table.if_not_exists,
-                }))
-            }
-            LogicalPlanType::CreateView(create_view) => {
-                let plan = create_view
-                    .input.clone().ok_or_else(|| BallistaError::General(String::from(
-                        "Protobuf deserialization error, CreateViewNode has invalid LogicalPlan input.",
-                    )))?
-                    .try_into_logical_plan(ctx, extension_codec)?;
-
-                Ok(LogicalPlan::CreateView(CreateView {
-                    name: create_view.name.clone(),
-                    input: Arc::new(plan),
-                    or_replace: create_view.or_replace,
-                }))
-            }
-            LogicalPlanType::CreateCatalogSchema(create_catalog_schema) => {
-                let pb_schema = (create_catalog_schema.schema.clone()).ok_or_else(|| {
-                    BallistaError::General(String::from(
-                        "Protobuf deserialization error, CreateCatalogSchemaNode was missing required field schema.",
-                    ))
-                })?;
-
-                Ok(LogicalPlan::CreateCatalogSchema(CreateCatalogSchema {
-                    schema_name: create_catalog_schema.schema_name.clone(),
-                    if_not_exists: create_catalog_schema.if_not_exists,
-                    schema: pb_schema.try_into()?,
-                }))
-            }
-            LogicalPlanType::CreateCatalog(create_catalog) => {
-                let pb_schema = (create_catalog.schema.clone()).ok_or_else(|| {
-                    BallistaError::General(String::from(
-                        "Protobuf deserialization error, CreateCatalogNode was missing required field schema.",
-                    ))
-                })?;
-
-                Ok(LogicalPlan::CreateCatalog(CreateCatalog {
-                    catalog_name: create_catalog.catalog_name.clone(),
-                    if_not_exists: create_catalog.if_not_exists,
-                    schema: pb_schema.try_into()?,
-                }))
-            }
-            LogicalPlanType::Analyze(analyze) => {
-                let input: LogicalPlan =
-                    into_logical_plan!(analyze.input, ctx, extension_codec)?;
-                LogicalPlanBuilder::from(input)
-                    .explain(analyze.verbose, true)?
-                    .build()
-                    .map_err(|e| e.into())
-            }
-            LogicalPlanType::Explain(explain) => {
-                let input: LogicalPlan =
-                    into_logical_plan!(explain.input, ctx, extension_codec)?;
-                LogicalPlanBuilder::from(input)
-                    .explain(explain.verbose, false)?
-                    .build()
-                    .map_err(|e| e.into())
-            }
-            LogicalPlanType::SubqueryAlias(aliased_relation) => {
-                let input: LogicalPlan =
-                    into_logical_plan!(aliased_relation.input, ctx, extension_codec)?;
-                LogicalPlanBuilder::from(input)
-                    .alias(&aliased_relation.alias)?
-                    .build()
-                    .map_err(|e| e.into())
-            }
-            LogicalPlanType::Limit(limit) => {
-                let input: LogicalPlan =
-                    into_logical_plan!(limit.input, ctx, extension_codec)?;
-                LogicalPlanBuilder::from(input)
-                    .limit(limit.limit as usize)?
-                    .build()
-                    .map_err(|e| e.into())
-            }
-            LogicalPlanType::Offset(offset) => {
-                let input: LogicalPlan =
-                    into_logical_plan!(offset.input, ctx, extension_codec)?;
-                LogicalPlanBuilder::from(input)
-                    .offset(offset.offset as usize)?
-                    .build()
-                    .map_err(|e| e.into())
-            }
-            LogicalPlanType::Join(join) => {
-                let left_keys: Vec<Column> =
-                    join.left_join_column.iter().map(|i| i.into()).collect();
-                let right_keys: Vec<Column> =
-                    join.right_join_column.iter().map(|i| i.into()).collect();
-                let join_type =
-                    protobuf::JoinType::from_i32(join.join_type).ok_or_else(|| {
-                        proto_error(format!(
-                            "Received a JoinNode message with unknown JoinType {}",
-                            join.join_type
-                        ))
-                    })?;
-                let join_constraint = protobuf::JoinConstraint::from_i32(
-                    join.join_constraint,
-                )
-                .ok_or_else(|| {
-                    proto_error(format!(
-                        "Received a JoinNode message with unknown JoinConstraint {}",
-                        join.join_constraint
-                    ))
-                })?;
-                let filter: Option<Expr> = join
-                    .filter
-                    .as_ref()
-                    .map(|expr| parse_expr(expr, ctx))
-                    .map_or(Ok(None), |v| v.map(Some))?;
-
-                let builder = LogicalPlanBuilder::from(into_logical_plan!(
-                    join.left,
-                    ctx,
-                    extension_codec
-                )?);
-                let builder = match join_constraint.into() {
-                    JoinConstraint::On => builder.join(
-                        &into_logical_plan!(join.right, ctx, extension_codec)?,
-                        join_type.into(),
-                        (left_keys, right_keys),
-                        filter,
-                    )?,
-                    JoinConstraint::Using => builder.join_using(
-                        &into_logical_plan!(join.right, ctx, extension_codec)?,
-                        join_type.into(),
-                        left_keys,
-                    )?,
-                };
-
-                builder.build().map_err(|e| e.into())
-            }
-            LogicalPlanType::Union(union) => {
-                let mut input_plans: Vec<LogicalPlan> = union
-                    .inputs
-                    .iter()
-                    .map(|i| i.try_into_logical_plan(ctx, extension_codec))
-                    .collect::<Result<_, BallistaError>>()?;
-
-                if input_plans.len() < 2 {
-                    return  Err( BallistaError::General(String::from(
-                       "Protobuf deserialization error, Union was require at least two input.",
-                   )));
-                }
-
-                let mut builder = LogicalPlanBuilder::from(input_plans.pop().unwrap());
-                for plan in input_plans {
-                    builder = builder.union(plan)?;
-                }
-                builder.build().map_err(|e| e.into())
-            }
-            LogicalPlanType::CrossJoin(crossjoin) => {
-                let left = into_logical_plan!(crossjoin.left, ctx, extension_codec)?;
-                let right = into_logical_plan!(crossjoin.right, ctx, extension_codec)?;
-
-                LogicalPlanBuilder::from(left)
-                    .cross_join(&right)?
-                    .build()
-                    .map_err(|e| e.into())
-            }
-            LogicalPlanType::Extension(LogicalExtensionNode { node, inputs }) => {
-                let input_plans: Vec<LogicalPlan> = inputs
-                    .iter()
-                    .map(|i| i.try_into_logical_plan(ctx, extension_codec))
-                    .collect::<Result<_, BallistaError>>()?;
-
-                let extension_node =
-                    extension_codec.try_decode(node, &input_plans, ctx)?;
-                Ok(LogicalPlan::Extension(extension_node))
-            }
-        }
-    }
-
-    fn try_from_logical_plan(
-        plan: &LogicalPlan,
-        extension_codec: &dyn LogicalExtensionCodec,
-    ) -> Result<Self, BallistaError>
-    where
-        Self: Sized,
-    {
-        match plan {
-            LogicalPlan::Values(Values { values, .. }) => {
-                let n_cols = if values.is_empty() {
-                    0
-                } else {
-                    values[0].len()
-                } as u64;
-                let values_list = values
-                    .iter()
-                    .flatten()
-                    .map(|v| v.try_into())
-                    .collect::<Result<Vec<_>, _>>()?;
-                Ok(protobuf::LogicalPlanNode {
-                    logical_plan_type: Some(LogicalPlanType::Values(
-                        protobuf::ValuesNode {
-                            n_cols,
-                            values_list,
-                        },
-                    )),
-                })
-            }
-            LogicalPlan::TableScan(TableScan {
-                table_name,
-                source,
-                filters,
-                projection,
-                ..
-            }) => {
-                let source = source_as_provider(source)?;
-                let schema = source.schema();
-                let source = source.as_any();
-
-                let projection = match projection {
-                    None => None,
-                    Some(columns) => {
-                        let column_names = columns
-                            .iter()
-                            .map(|i| schema.field(*i).name().to_owned())
-                            .collect();
-                        Some(protobuf::ProjectionColumns {
-                            columns: column_names,
-                        })
-                    }
-                };
-                let schema: datafusion_proto::protobuf::Schema = schema.as_ref().into();
-
-                let filters: Vec<datafusion_proto::protobuf::LogicalExprNode> = filters
-                    .iter()
-                    .map(|filter| filter.try_into())
-                    .collect::<Result<Vec<_>, _>>()?;
-
-                if let Some(listing_table) = source.downcast_ref::<ListingTable>() {
-                    let any = listing_table.options().format.as_any();
-                    let file_format_type = if let Some(parquet) =
-                        any.downcast_ref::<ParquetFormat>()
-                    {
-                        FileFormatType::Parquet(protobuf::ParquetFormat {
-                            enable_pruning: parquet.enable_pruning(),
-                        })
-                    } else if let Some(csv) = any.downcast_ref::<CsvFormat>() {
-                        FileFormatType::Csv(protobuf::CsvFormat {
-                            delimiter: byte_to_string(csv.delimiter())?,
-                            has_header: csv.has_header(),
-                        })
-                    } else if any.is::<AvroFormat>() {
-                        FileFormatType::Avro(protobuf::AvroFormat {})
-                    } else {
-                        return Err(proto_error(format!(
-                            "Error converting file format, {:?} is invalid as a datafusion foramt.",
-                            listing_table.options().format
-                        )));
-                    };
-                    Ok(protobuf::LogicalPlanNode {
-                        logical_plan_type: Some(LogicalPlanType::ListingScan(
-                            protobuf::ListingTableScanNode {
-                                file_format_type: Some(file_format_type),
-                                table_name: table_name.to_owned(),
-                                collect_stat: listing_table.options().collect_stat,
-                                file_extension: listing_table
-                                    .options()
-                                    .file_extension
-                                    .clone(),
-                                table_partition_cols: listing_table
-                                    .options()
-                                    .table_partition_cols
-                                    .clone(),
-                                path: listing_table.table_path().to_string(),
-                                schema: Some(schema),
-                                projection,
-                                filters,
-                                target_partitions: listing_table
-                                    .options()
-                                    .target_partitions
-                                    as u32,
-                            },
-                        )),
-                    })
-                } else {
-                    Err(BallistaError::General(format!(
-                        "logical plan to_proto unsupported table provider {:?}",
-                        source
-                    )))
-                }
-            }
-            LogicalPlan::Projection(Projection {
-                expr, input, alias, ..
-            }) => Ok(protobuf::LogicalPlanNode {
-                logical_plan_type: Some(LogicalPlanType::Projection(Box::new(
-                    protobuf::ProjectionNode {
-                        input: Some(Box::new(
-                            protobuf::LogicalPlanNode::try_from_logical_plan(
-                                input.as_ref(),
-                                extension_codec,
-                            )?,
-                        )),
-                        expr: expr.iter().map(|expr| expr.try_into()).collect::<Result<
-                            Vec<_>,
-                            datafusion_proto::to_proto::Error,
-                        >>(
-                        )?,
-                        optional_alias: alias
-                            .clone()
-                            .map(protobuf::projection_node::OptionalAlias::Alias),
-                    },
-                ))),
-            }),
-            LogicalPlan::Filter(Filter { predicate, input }) => {
-                let input: protobuf::LogicalPlanNode =
-                    protobuf::LogicalPlanNode::try_from_logical_plan(
-                        input.as_ref(),
-                        extension_codec,
-                    )?;
-                Ok(protobuf::LogicalPlanNode {
-                    logical_plan_type: Some(LogicalPlanType::Selection(Box::new(
-                        protobuf::SelectionNode {
-                            input: Some(Box::new(input)),
-                            expr: Some(predicate.try_into()?),
-                        },
-                    ))),
-                })
-            }
-            LogicalPlan::Window(Window {
-                input, window_expr, ..
-            }) => {
-                let input: protobuf::LogicalPlanNode =
-                    protobuf::LogicalPlanNode::try_from_logical_plan(
-                        input.as_ref(),
-                        extension_codec,
-                    )?;
-                Ok(protobuf::LogicalPlanNode {
-                    logical_plan_type: Some(LogicalPlanType::Window(Box::new(
-                        protobuf::WindowNode {
-                            input: Some(Box::new(input)),
-                            window_expr: window_expr
-                                .iter()
-                                .map(|expr| expr.try_into())
-                                .collect::<Result<Vec<_>, _>>()?,
-                        },
-                    ))),
-                })
-            }
-            LogicalPlan::Aggregate(Aggregate {
-                group_expr,
-                aggr_expr,
-                input,
-                ..
-            }) => {
-                let input: protobuf::LogicalPlanNode =
-                    protobuf::LogicalPlanNode::try_from_logical_plan(
-                        input.as_ref(),
-                        extension_codec,
-                    )?;
-                Ok(protobuf::LogicalPlanNode {
-                    logical_plan_type: Some(LogicalPlanType::Aggregate(Box::new(
-                        protobuf::AggregateNode {
-                            input: Some(Box::new(input)),
-                            group_expr: group_expr
-                                .iter()
-                                .map(|expr| expr.try_into())
-                                .collect::<Result<Vec<_>, _>>()?,
-                            aggr_expr: aggr_expr
-                                .iter()
-                                .map(|expr| expr.try_into())
-                                .collect::<Result<Vec<_>, _>>()?,
-                        },
-                    ))),
-                })
-            }
-            LogicalPlan::Join(Join {
-                left,
-                right,
-                on,
-                filter,
-                join_type,
-                join_constraint,
-                null_equals_null,
-                ..
-            }) => {
-                let left: protobuf::LogicalPlanNode =
-                    protobuf::LogicalPlanNode::try_from_logical_plan(
-                        left.as_ref(),
-                        extension_codec,
-                    )?;
-                let right: protobuf::LogicalPlanNode =
-                    protobuf::LogicalPlanNode::try_from_logical_plan(
-                        right.as_ref(),
-                        extension_codec,
-                    )?;
-                let (left_join_column, right_join_column) =
-                    on.iter().map(|(l, r)| (l.into(), r.into())).unzip();
-                let join_type: protobuf::JoinType = join_type.to_owned().into();
-                let join_constraint: protobuf::JoinConstraint =
-                    join_constraint.to_owned().into();
-                let filter = filter
-                    .as_ref()
-                    .map(|e| e.try_into())
-                    .map_or(Ok(None), |v| v.map(Some))?;
-
-                Ok(protobuf::LogicalPlanNode {
-                    logical_plan_type: Some(LogicalPlanType::Join(Box::new(
-                        protobuf::JoinNode {
-                            left: Some(Box::new(left)),
-                            right: Some(Box::new(right)),
-                            join_type: join_type.into(),
-                            join_constraint: join_constraint.into(),
-                            left_join_column,
-                            right_join_column,
-                            null_equals_null: *null_equals_null,
-                            filter,
-                        },
-                    ))),
-                })
-            }
-            LogicalPlan::Subquery(_) => {
-                // note that the ballista and datafusion proto files need refactoring to allow
-                // LogicalExprNode to reference a LogicalPlanNode
-                // see https://github.com/apache/arrow-datafusion/issues/2338
-                Err(BallistaError::NotImplemented(
-                    "Ballista does not support subqueries".to_string(),
-                ))
-            }
-            LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
-                let input: protobuf::LogicalPlanNode =
-                    protobuf::LogicalPlanNode::try_from_logical_plan(
-                        input.as_ref(),
-                        extension_codec,
-                    )?;
-                Ok(protobuf::LogicalPlanNode {
-                    logical_plan_type: Some(LogicalPlanType::SubqueryAlias(Box::new(
-                        protobuf::SubqueryAliasNode {
-                            input: Some(Box::new(input)),
-                            alias: alias.clone(),
-                        },
-                    ))),
-                })
-            }
-            LogicalPlan::Limit(Limit { input, n }) => {
-                let input: protobuf::LogicalPlanNode =
-                    protobuf::LogicalPlanNode::try_from_logical_plan(
-                        input.as_ref(),
-                        extension_codec,
-                    )?;
-                Ok(protobuf::LogicalPlanNode {
-                    logical_plan_type: Some(LogicalPlanType::Limit(Box::new(
-                        protobuf::LimitNode {
-                            input: Some(Box::new(input)),
-                            limit: *n as u32,
-                        },
-                    ))),
-                })
-            }
-            LogicalPlan::Offset(Offset { input, offset }) => {
-                let input: protobuf::LogicalPlanNode =
-                    protobuf::LogicalPlanNode::try_from_logical_plan(
-                        input.as_ref(),
-                        extension_codec,
-                    )?;
-                Ok(protobuf::LogicalPlanNode {
-                    logical_plan_type: Some(LogicalPlanType::Offset(Box::new(
-                        protobuf::OffsetNode {
-                            input: Some(Box::new(input)),
-                            offset: *offset as u32,
-                        },
-                    ))),
-                })
-            }
-            LogicalPlan::Sort(Sort { input, expr }) => {
-                let input: protobuf::LogicalPlanNode =
-                    protobuf::LogicalPlanNode::try_from_logical_plan(
-                        input.as_ref(),
-                        extension_codec,
-                    )?;
-                let selection_expr: Vec<datafusion_proto::protobuf::LogicalExprNode> =
-                    expr.iter()
-                        .map(|expr| expr.try_into())
-                        .collect::<Result<Vec<_>, datafusion_proto::to_proto::Error>>()?;
-                Ok(protobuf::LogicalPlanNode {
-                    logical_plan_type: Some(LogicalPlanType::Sort(Box::new(
-                        protobuf::SortNode {
-                            input: Some(Box::new(input)),
-                            expr: selection_expr,
-                        },
-                    ))),
-                })
-            }
-            LogicalPlan::Repartition(Repartition {
-                input,
-                partitioning_scheme,
-            }) => {
-                use datafusion::logical_plan::Partitioning;
-                let input: protobuf::LogicalPlanNode =
-                    protobuf::LogicalPlanNode::try_from_logical_plan(
-                        input.as_ref(),
-                        extension_codec,
-                    )?;
-
-                // Assumed common usize field was batch size
-                // Used u64 to avoid any nastyness involving large values, most data clusters are probably uniformly 64 bits any ways
-                use protobuf::repartition_node::PartitionMethod;
-
-                let pb_partition_method =
-                    match partitioning_scheme {
-                        Partitioning::Hash(exprs, partition_count) => {
-                            PartitionMethod::Hash(protobuf::HashRepartition {
-                                hash_expr: exprs
-                                    .iter()
-                                    .map(|expr| expr.try_into())
-                                    .collect::<Result<
-                                    Vec<_>,
-                                    datafusion_proto::to_proto::Error,
-                                >>()?,
-                                partition_count: *partition_count as u64,
-                            })
-                        }
-                        Partitioning::RoundRobinBatch(partition_count) => {
-                            PartitionMethod::RoundRobin(*partition_count as u64)
-                        }
-                    };
-
-                Ok(protobuf::LogicalPlanNode {
-                    logical_plan_type: Some(LogicalPlanType::Repartition(Box::new(
-                        protobuf::RepartitionNode {
-                            input: Some(Box::new(input)),
-                            partition_method: Some(pb_partition_method),
-                        },
-                    ))),
-                })
-            }
-            LogicalPlan::EmptyRelation(EmptyRelation {
-                produce_one_row, ..
-            }) => Ok(protobuf::LogicalPlanNode {
-                logical_plan_type: Some(LogicalPlanType::EmptyRelation(
-                    protobuf::EmptyRelationNode {
-                        produce_one_row: *produce_one_row,
-                    },
-                )),
-            }),
-            LogicalPlan::CreateExternalTable(CreateExternalTable {
-                name,
-                location,
-                file_type,
-                has_header,
-                delimiter,
-                schema: df_schema,
-                table_partition_cols,
-                if_not_exists,
-            }) => {
-                use datafusion::logical_plan::FileType;
-
-                let pb_file_type: protobuf::FileType = match file_type {
-                    FileType::NdJson => protobuf::FileType::NdJson,
-                    FileType::Parquet => protobuf::FileType::Parquet,
-                    FileType::CSV => protobuf::FileType::Csv,
-                    FileType::Avro => protobuf::FileType::Avro,
-                };
-
-                Ok(protobuf::LogicalPlanNode {
-                    logical_plan_type: Some(LogicalPlanType::CreateExternalTable(
-                        protobuf::CreateExternalTableNode {
-                            name: name.clone(),
-                            location: location.clone(),
-                            file_type: pb_file_type as i32,
-                            has_header: *has_header,
-                            schema: Some(df_schema.into()),
-                            table_partition_cols: table_partition_cols.clone(),
-                            if_not_exists: *if_not_exists,
-                            delimiter: String::from(*delimiter),
-                        },
-                    )),
-                })
-            }
-            LogicalPlan::CreateView(CreateView {
-                name,
-                input,
-                or_replace,
-            }) => Ok(protobuf::LogicalPlanNode {
-                logical_plan_type: Some(LogicalPlanType::CreateView(Box::new(
-                    protobuf::CreateViewNode {
-                        name: name.clone(),
-                        input: Some(Box::new(LogicalPlanNode::try_from_logical_plan(
-                            input,
-                            extension_codec,
-                        )?)),
-                        or_replace: *or_replace,
-                    },
-                ))),
-            }),
-            LogicalPlan::CreateCatalogSchema(CreateCatalogSchema {
-                schema_name,
-                if_not_exists,
-                schema: df_schema,
-            }) => Ok(protobuf::LogicalPlanNode {
-                logical_plan_type: Some(LogicalPlanType::CreateCatalogSchema(
-                    protobuf::CreateCatalogSchemaNode {
-                        schema_name: schema_name.clone(),
-                        if_not_exists: *if_not_exists,
-                        schema: Some(df_schema.into()),
-                    },
-                )),
-            }),
-            LogicalPlan::CreateCatalog(CreateCatalog {
-                catalog_name,
-                if_not_exists,
-                schema: df_schema,
-            }) => Ok(protobuf::LogicalPlanNode {
-                logical_plan_type: Some(LogicalPlanType::CreateCatalog(
-                    protobuf::CreateCatalogNode {
-                        catalog_name: catalog_name.clone(),
-                        if_not_exists: *if_not_exists,
-                        schema: Some(df_schema.into()),
-                    },
-                )),
-            }),
-            LogicalPlan::Analyze(a) => {
-                let input = protobuf::LogicalPlanNode::try_from_logical_plan(
-                    a.input.as_ref(),
-                    extension_codec,
-                )?;
-                Ok(protobuf::LogicalPlanNode {
-                    logical_plan_type: Some(LogicalPlanType::Analyze(Box::new(
-                        protobuf::AnalyzeNode {
-                            input: Some(Box::new(input)),
-                            verbose: a.verbose,
-                        },
-                    ))),
-                })
-            }
-            LogicalPlan::Explain(a) => {
-                let input = protobuf::LogicalPlanNode::try_from_logical_plan(
-                    a.plan.as_ref(),
-                    extension_codec,
-                )?;
-                Ok(protobuf::LogicalPlanNode {
-                    logical_plan_type: Some(LogicalPlanType::Explain(Box::new(
-                        protobuf::ExplainNode {
-                            input: Some(Box::new(input)),
-                            verbose: a.verbose,
-                        },
-                    ))),
-                })
-            }
-            LogicalPlan::Union(union) => {
-                let inputs: Vec<LogicalPlanNode> = union
-                    .inputs
-                    .iter()
-                    .map(|i| {
-                        protobuf::LogicalPlanNode::try_from_logical_plan(
-                            i,
-                            extension_codec,
-                        )
-                    })
-                    .collect::<Result<_, BallistaError>>()?;
-                Ok(protobuf::LogicalPlanNode {
-                    logical_plan_type: Some(LogicalPlanType::Union(
-                        protobuf::UnionNode { inputs },
-                    )),
-                })
-            }
-            LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
-                let left = protobuf::LogicalPlanNode::try_from_logical_plan(
-                    left.as_ref(),
-                    extension_codec,
-                )?;
-                let right = protobuf::LogicalPlanNode::try_from_logical_plan(
-                    right.as_ref(),
-                    extension_codec,
-                )?;
-                Ok(protobuf::LogicalPlanNode {
-                    logical_plan_type: Some(LogicalPlanType::CrossJoin(Box::new(
-                        protobuf::CrossJoinNode {
-                            left: Some(Box::new(left)),
-                            right: Some(Box::new(right)),
-                        },
-                    ))),
-                })
-            }
-            LogicalPlan::Extension(extension) => {
-                let mut buf: Vec<u8> = vec![];
-                extension_codec.try_encode(extension, &mut buf)?;
-
-                let inputs: Vec<LogicalPlanNode> = extension
-                    .node
-                    .inputs()
-                    .iter()
-                    .map(|i| {
-                        protobuf::LogicalPlanNode::try_from_logical_plan(
-                            i,
-                            extension_codec,
-                        )
-                    })
-                    .collect::<Result<_, BallistaError>>()?;
-
-                Ok(protobuf::LogicalPlanNode {
-                    logical_plan_type: Some(LogicalPlanType::Extension(
-                        LogicalExtensionNode { node: buf, inputs },
-                    )),
-                })
-            }
-            LogicalPlan::CreateMemoryTable(_) => Err(proto_error(
-                "Error converting CreateMemoryTable. Not yet supported in Ballista",
-            )),
-            LogicalPlan::DropTable(_) => Err(proto_error(
-                "Error converting DropTable. Not yet supported in Ballista",
-            )),
-        }
-    }
-}
-
 #[macro_export]
 macro_rules! into_logical_plan {
     ($PB:expr, $CTX:expr, $CODEC:expr) => {{
@@ -1116,11 +110,11 @@ mod roundtrip_tests {
         ($initial_struct:ident) => {
             let ctx = SessionContext::new();
             let codec: BallistaCodec<
-                protobuf::LogicalPlanNode,
+                datafusion_proto::protobuf::LogicalPlanNode,
                 protobuf::PhysicalPlanNode,
             > = BallistaCodec::default();
-            let proto: protobuf::LogicalPlanNode =
-                protobuf::LogicalPlanNode::try_from_logical_plan(
+            let proto: datafusion_proto::protobuf::LogicalPlanNode =
+                datafusion_proto::protobuf::LogicalPlanNode::try_from_logical_plan(
                     &$initial_struct,
                     codec.logical_extension_codec(),
                 )
@@ -1139,7 +133,7 @@ mod roundtrip_tests {
                 protobuf::LogicalPlanNode,
                 protobuf::PhysicalPlanNode,
             > = BallistaCodec::default();
-            let proto: protobuf::LogicalPlanNode =
+            let proto: datafusion_proto::protobuf::LogicalPlanNode =
                 protobuf::LogicalPlanNode::try_from_logical_plan(&$initial_struct)
                     .expect("from logical plan");
             let round_trip: LogicalPlan = proto
@@ -1336,8 +330,10 @@ mod roundtrip_tests {
     #[tokio::test]
     async fn roundtrip_logical_plan_custom_ctx() -> Result<()> {
         let ctx = SessionContext::new();
-        let codec: BallistaCodec<protobuf::LogicalPlanNode, protobuf::PhysicalPlanNode> =
-            BallistaCodec::default();
+        let codec: BallistaCodec<
+            datafusion_proto::protobuf::LogicalPlanNode,
+            protobuf::PhysicalPlanNode,
+        > = BallistaCodec::default();
         let custom_object_store = Arc::new(TestObjectStore {});
         ctx.runtime_env()
             .register_object_store("test", custom_object_store.clone());
@@ -1357,8 +353,8 @@ mod roundtrip_tests {
             .await?
             .to_logical_plan()?;
 
-        let proto: protobuf::LogicalPlanNode =
-            protobuf::LogicalPlanNode::try_from_logical_plan(
+        let proto: datafusion_proto::protobuf::LogicalPlanNode =
+            datafusion_proto::protobuf::LogicalPlanNode::try_from_logical_plan(
                 &plan,
                 codec.logical_extension_codec(),
             )
diff --git a/ballista/rust/core/src/serde/mod.rs b/ballista/rust/core/src/serde/mod.rs
index b34b5cb4..a955c099 100644
--- a/ballista/rust/core/src/serde/mod.rs
+++ b/ballista/rust/core/src/serde/mod.rs
@@ -18,24 +18,20 @@
 //! This crate contains code generated from the Ballista Protocol Buffer Definition as well
 //! as convenience code for interacting with the generated code.
 
-use prost::bytes::BufMut;
-use std::fmt::Debug;
-use std::marker::PhantomData;
-use std::sync::Arc;
-use std::{convert::TryInto, io::Cursor};
-
-use datafusion::logical_plan::{
-    FunctionRegistry, JoinConstraint, JoinType, LogicalPlan, Operator,
-};
-
 use crate::{error::BallistaError, serde::scheduler::Action as BallistaAction};
-
 use datafusion::execution::runtime_env::RuntimeEnv;
-use datafusion::logical_plan::plan::Extension;
+use datafusion::logical_plan::{FunctionRegistry, JoinConstraint, JoinType, Operator};
 use datafusion::physical_plan::join_utils::JoinSide;
 use datafusion::physical_plan::ExecutionPlan;
-use datafusion::prelude::SessionContext;
+use datafusion_proto::logical_plan::{
+    AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
+};
+use prost::bytes::BufMut;
 use prost::Message;
+use std::fmt::Debug;
+use std::marker::PhantomData;
+use std::sync::Arc;
+use std::{convert::TryInto, io::Cursor};
 
 // include the generated protobuf source as a submodule
 #[allow(clippy::all)]
@@ -59,71 +55,6 @@ pub(crate) fn proto_error<S: Into<String>>(message: S) -> BallistaError {
     BallistaError::General(message.into())
 }
 
-pub trait AsLogicalPlan: Debug + Send + Sync + Clone {
-    fn try_decode(buf: &[u8]) -> Result<Self, BallistaError>
-    where
-        Self: Sized;
-
-    fn try_encode<B>(&self, buf: &mut B) -> Result<(), BallistaError>
-    where
-        B: BufMut,
-        Self: Sized;
-
-    fn try_into_logical_plan(
-        &self,
-        ctx: &SessionContext,
-        extension_codec: &dyn LogicalExtensionCodec,
-    ) -> Result<LogicalPlan, BallistaError>;
-
-    fn try_from_logical_plan(
-        plan: &LogicalPlan,
-        extension_codec: &dyn LogicalExtensionCodec,
-    ) -> Result<Self, BallistaError>
-    where
-        Self: Sized;
-}
-
-pub trait LogicalExtensionCodec: Debug + Send + Sync {
-    fn try_decode(
-        &self,
-        buf: &[u8],
-        inputs: &[LogicalPlan],
-        ctx: &SessionContext,
-    ) -> Result<Extension, BallistaError>;
-
-    fn try_encode(
-        &self,
-        node: &Extension,
-        buf: &mut Vec<u8>,
-    ) -> Result<(), BallistaError>;
-}
-
-#[derive(Debug, Clone)]
-pub struct DefaultLogicalExtensionCodec {}
-
-impl LogicalExtensionCodec for DefaultLogicalExtensionCodec {
-    fn try_decode(
-        &self,
-        _buf: &[u8],
-        _inputs: &[LogicalPlan],
-        _ctx: &SessionContext,
-    ) -> Result<Extension, BallistaError> {
-        Err(BallistaError::NotImplemented(
-            "LogicalExtensionCodec is not provided".to_string(),
-        ))
-    }
-
-    fn try_encode(
-        &self,
-        _node: &Extension,
-        _buf: &mut Vec<u8>,
-    ) -> Result<(), BallistaError> {
-        Err(BallistaError::NotImplemented(
-            "LogicalExtensionCodec is not provided".to_string(),
-        ))
-    }
-}
-
 pub trait AsExecutionPlan: Debug + Send + Sync + Clone {
     fn try_decode(buf: &[u8]) -> Result<Self, BallistaError>
     where
@@ -412,10 +343,11 @@ mod tests {
     }
 
     use crate::error::BallistaError;
-    use crate::serde::protobuf::{LogicalPlanNode, PhysicalPlanNode};
+    use crate::serde::protobuf::PhysicalPlanNode;
     use crate::serde::{
         AsExecutionPlan, AsLogicalPlan, LogicalExtensionCodec, PhysicalExtensionCodec,
     };
+    use datafusion_proto::protobuf::LogicalPlanNode;
     use proto::{TopKExecProto, TopKPlanProto};
 
     struct TopKPlanNode {
@@ -620,10 +552,10 @@ mod tests {
             buf: &[u8],
             inputs: &[LogicalPlan],
             ctx: &SessionContext,
-        ) -> Result<Extension, BallistaError> {
+        ) -> Result<Extension, DataFusionError> {
             if let Some((input, _)) = inputs.split_first() {
                 let proto = TopKPlanProto::decode(buf).map_err(|e| {
-                    BallistaError::Internal(format!(
+                    DataFusionError::Internal(format!(
                         "failed to decode logical plan: {:?}",
                         e
                     ))
@@ -640,10 +572,10 @@ mod tests {
                         node: Arc::new(node),
                     })
                 } else {
-                    Err(BallistaError::from("invalid plan, no expr".to_string()))
+                    Err(DataFusionError::Plan("invalid plan, no expr".to_string()))
                 }
             } else {
-                Err(BallistaError::from("invalid plan, no input".to_string()))
+                Err(DataFusionError::Plan("invalid plan, no input".to_string()))
             }
         }
 
@@ -651,7 +583,7 @@ mod tests {
             &self,
             node: &Extension,
             buf: &mut Vec<u8>,
-        ) -> Result<(), BallistaError> {
+        ) -> Result<(), DataFusionError> {
             if let Some(exec) = node.node.as_any().downcast_ref::<TopKPlanNode>() {
                 let proto = TopKPlanProto {
                     k: exec.k as u64,
@@ -659,7 +591,7 @@ mod tests {
                 };
 
                 proto.encode(buf).map_err(|e| {
-                    BallistaError::Internal(format!(
+                    DataFusionError::Internal(format!(
                         "failed to encode logical plan: {:?}",
                         e
                     ))
@@ -667,7 +599,7 @@ mod tests {
 
                 Ok(())
             } else {
-                Err(BallistaError::from("unsupported plan type".to_string()))
+                Err(DataFusionError::Plan("unsupported plan type".to_string()))
             }
         }
     }
diff --git a/ballista/rust/core/src/serde/physical_plan/mod.rs b/ballista/rust/core/src/serde/physical_plan/mod.rs
index a0b12391..cbe8c984 100644
--- a/ballista/rust/core/src/serde/physical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/physical_plan/mod.rs
@@ -1160,8 +1160,9 @@ mod roundtrip_tests {
     };
 
     use crate::execution_plans::ShuffleWriterExec;
-    use crate::serde::protobuf::{LogicalPlanNode, PhysicalPlanNode};
+    use crate::serde::protobuf::PhysicalPlanNode;
     use crate::serde::{AsExecutionPlan, BallistaCodec};
+    use datafusion_proto::protobuf::LogicalPlanNode;
 
     use super::super::super::error::Result;
     use super::super::protobuf;
diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index 1418aecb..7493b1ee 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -15,21 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::io::{BufWriter, Write};
-use std::marker::PhantomData;
-
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::Arc;
-use std::{fs::File, pin::Pin};
-
+use crate::config::BallistaConfig;
 use crate::error::{BallistaError, Result};
 use crate::execution_plans::{
     DistributedQueryExec, ShuffleWriterExec, UnresolvedShuffleExec,
 };
 use crate::serde::scheduler::PartitionStats;
-
-use crate::config::BallistaConfig;
-use crate::serde::{AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec};
 use async_trait::async_trait;
 use datafusion::arrow::datatypes::Schema;
 use datafusion::arrow::{ipc::writer::FileWriter, record_batch::RecordBatch};
@@ -37,22 +28,28 @@ use datafusion::error::DataFusionError;
 use datafusion::execution::context::{
     QueryPlanner, SessionConfig, SessionContext, SessionState,
 };
+use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
 use datafusion::logical_plan::LogicalPlan;
-
+use datafusion::physical_plan::aggregates::AggregateExec;
 use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
 use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
 use datafusion::physical_plan::common::batch_byte_size;
 use datafusion::physical_plan::empty::EmptyExec;
-
-use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
-use datafusion::physical_plan::aggregates::AggregateExec;
 use datafusion::physical_plan::file_format::{CsvExec, ParquetExec};
 use datafusion::physical_plan::filter::FilterExec;
 use datafusion::physical_plan::hash_join::HashJoinExec;
 use datafusion::physical_plan::projection::ProjectionExec;
 use datafusion::physical_plan::sorts::sort::SortExec;
 use datafusion::physical_plan::{metrics, ExecutionPlan, RecordBatchStream};
+use datafusion_proto::logical_plan::{
+    AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
+};
 use futures::StreamExt;
+use std::io::{BufWriter, Write};
+use std::marker::PhantomData;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::Arc;
+use std::{fs::File, pin::Pin};
 
 /// Stream data to disk in Arrow IPC format
 
diff --git a/ballista/rust/executor/Cargo.toml b/ballista/rust/executor/Cargo.toml
index cc3e8f19..439d19d5 100644
--- a/ballista/rust/executor/Cargo.toml
+++ b/ballista/rust/executor/Cargo.toml
@@ -41,6 +41,7 @@ ballista-core = { path = "../core", version = "0.7.0" }
 chrono = { version = "0.4", default-features = false }
 configure_me = "0.4.0"
 datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "3c1c188e1476575f113a511789e398fdd5c009cd" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "3c1c188e1476575f113a511789e398fdd5c009cd" }
 env_logger = "0.9"
 futures = "0.3"
 hyper = "0.14.4"
diff --git a/ballista/rust/executor/src/execution_loop.rs b/ballista/rust/executor/src/execution_loop.rs
index 06128f8d..e1a55a3e 100644
--- a/ballista/rust/executor/src/execution_loop.rs
+++ b/ballista/rust/executor/src/execution_loop.rs
@@ -15,28 +15,26 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::collections::HashMap;
-use std::ops::Deref;
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::mpsc::{Receiver, Sender, TryRecvError};
-use std::{sync::Arc, time::Duration};
-
-use datafusion::physical_plan::ExecutionPlan;
-use log::{debug, error, info, warn};
-use tonic::transport::Channel;
-
-use ballista_core::serde::protobuf::{
-    scheduler_grpc_client::SchedulerGrpcClient, PollWorkParams, PollWorkResult,
-    TaskDefinition, TaskStatus,
-};
-
 use crate::as_task_status;
 use crate::executor::Executor;
 use ballista_core::error::BallistaError;
 use ballista_core::serde::physical_plan::from_proto::parse_protobuf_hash_partitioning;
+use ballista_core::serde::protobuf::{
+    scheduler_grpc_client::SchedulerGrpcClient, PollWorkParams, PollWorkResult,
+    TaskDefinition, TaskStatus,
+};
 use ballista_core::serde::scheduler::ExecutorSpecification;
-use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan, BallistaCodec};
+use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
 use datafusion::execution::context::TaskContext;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion_proto::logical_plan::AsLogicalPlan;
+use log::{debug, error, info, warn};
+use std::collections::HashMap;
+use std::ops::Deref;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::mpsc::{Receiver, Sender, TryRecvError};
+use std::{sync::Arc, time::Duration};
+use tonic::transport::Channel;
 
 pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
     mut scheduler: SchedulerGrpcClient<Channel>,
diff --git a/ballista/rust/executor/src/executor_server.rs b/ballista/rust/executor/src/executor_server.rs
index 11a5c755..de202a54 100644
--- a/ballista/rust/executor/src/executor_server.rs
+++ b/ballista/rust/executor/src/executor_server.rs
@@ -37,9 +37,10 @@ use ballista_core::serde::protobuf::{
     StopExecutorParams, StopExecutorResult, TaskDefinition, UpdateTaskStatusParams,
 };
 use ballista_core::serde::scheduler::ExecutorState;
-use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan, BallistaCodec};
+use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
 use datafusion::execution::context::TaskContext;
 use datafusion::physical_plan::ExecutionPlan;
+use datafusion_proto::logical_plan::AsLogicalPlan;
 
 use crate::as_task_status;
 use crate::cpu_bound_executor::DedicatedExecutor;
diff --git a/ballista/rust/executor/src/main.rs b/ballista/rust/executor/src/main.rs
index 825ddd4d..f15d625c 100644
--- a/ballista/rust/executor/src/main.rs
+++ b/ballista/rust/executor/src/main.rs
@@ -35,7 +35,7 @@ use ballista_core::config::TaskSchedulingPolicy;
 use ballista_core::error::BallistaError;
 use ballista_core::serde::protobuf::{
     executor_registration, scheduler_grpc_client::SchedulerGrpcClient,
-    ExecutorRegistration, LogicalPlanNode, PhysicalPlanNode,
+    ExecutorRegistration, PhysicalPlanNode,
 };
 use ballista_core::serde::scheduler::ExecutorSpecification;
 use ballista_core::serde::BallistaCodec;
@@ -45,6 +45,7 @@ use ballista_executor::flight_service::BallistaFlightService;
 use ballista_executor::metrics::LoggingMetricsCollector;
 use config::prelude::*;
 use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
+use datafusion_proto::protobuf::LogicalPlanNode;
 
 #[macro_use]
 extern crate configure_me;
diff --git a/ballista/rust/executor/src/standalone.rs b/ballista/rust/executor/src/standalone.rs
index d68052af..204ce95f 100644
--- a/ballista/rust/executor/src/standalone.rs
+++ b/ballista/rust/executor/src/standalone.rs
@@ -15,12 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::sync::Arc;
-
+use crate::metrics::LoggingMetricsCollector;
+use crate::{execution_loop, executor::Executor, flight_service::BallistaFlightService};
 use arrow_flight::flight_service_server::FlightServiceServer;
-
 use ballista_core::serde::scheduler::ExecutorSpecification;
-use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan, BallistaCodec};
+use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
 use ballista_core::{
     error::Result,
     serde::protobuf::executor_registration::OptionalHost,
@@ -28,15 +27,14 @@ use ballista_core::{
     BALLISTA_VERSION,
 };
 use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
+use datafusion_proto::logical_plan::AsLogicalPlan;
 use log::info;
+use std::sync::Arc;
 use tempfile::TempDir;
 use tokio::net::TcpListener;
 use tonic::transport::{Channel, Server};
 use uuid::Uuid;
 
-use crate::metrics::LoggingMetricsCollector;
-use crate::{execution_loop, executor::Executor, flight_service::BallistaFlightService};
-
 pub async fn new_standalone_executor<
     T: 'static + AsLogicalPlan,
     U: 'static + AsExecutionPlan,
diff --git a/ballista/rust/scheduler/Cargo.toml b/ballista/rust/scheduler/Cargo.toml
index 283bcce5..aa335e61 100644
--- a/ballista/rust/scheduler/Cargo.toml
+++ b/ballista/rust/scheduler/Cargo.toml
@@ -42,6 +42,7 @@ ballista-core = { path = "../core", version = "0.7.0" }
 clap = { version = "3", features = ["derive", "cargo"] }
 configure_me = "0.4.0"
 datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "3c1c188e1476575f113a511789e398fdd5c009cd" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "3c1c188e1476575f113a511789e398fdd5c009cd" }
 env_logger = "0.9"
 etcd-client = { version = "0.9", optional = true }
 futures = "0.3"
diff --git a/ballista/rust/scheduler/src/api/handlers.rs b/ballista/rust/scheduler/src/api/handlers.rs
index 72f17445..b6c322dc 100644
--- a/ballista/rust/scheduler/src/api/handlers.rs
+++ b/ballista/rust/scheduler/src/api/handlers.rs
@@ -11,8 +11,9 @@
 // limitations under the License.
 
 use crate::scheduler_server::SchedulerServer;
-use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan};
+use ballista_core::serde::AsExecutionPlan;
 use ballista_core::BALLISTA_VERSION;
+use datafusion_proto::logical_plan::AsLogicalPlan;
 use warp::Rejection;
 
 #[derive(Debug, serde::Serialize)]
diff --git a/ballista/rust/scheduler/src/api/mod.rs b/ballista/rust/scheduler/src/api/mod.rs
index 98b70463..c3eba901 100644
--- a/ballista/rust/scheduler/src/api/mod.rs
+++ b/ballista/rust/scheduler/src/api/mod.rs
@@ -14,7 +14,8 @@ mod handlers;
 
 use crate::scheduler_server::SchedulerServer;
 use anyhow::Result;
-use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan};
+use ballista_core::serde::AsExecutionPlan;
+use datafusion_proto::logical_plan::AsLogicalPlan;
 use std::{
     pin::Pin,
     task::{Context as TaskContext, Poll},
diff --git a/ballista/rust/scheduler/src/main.rs b/ballista/rust/scheduler/src/main.rs
index 43048218..f9f3b6ad 100644
--- a/ballista/rust/scheduler/src/main.rs
+++ b/ballista/rust/scheduler/src/main.rs
@@ -30,15 +30,14 @@ use tower::Service;
 use ballista_core::BALLISTA_VERSION;
 use ballista_core::{
     print_version,
-    serde::protobuf::{
-        scheduler_grpc_server::SchedulerGrpcServer, LogicalPlanNode, PhysicalPlanNode,
-    },
+    serde::protobuf::{scheduler_grpc_server::SchedulerGrpcServer, PhysicalPlanNode},
 };
 use ballista_scheduler::api::{get_routes, EitherBody, Error};
 #[cfg(feature = "etcd")]
 use ballista_scheduler::state::backend::etcd::EtcdClient;
 #[cfg(feature = "sled")]
 use ballista_scheduler::state::backend::standalone::StandaloneClient;
+use datafusion_proto::protobuf::LogicalPlanNode;
 
 use ballista_scheduler::scheduler_server::SchedulerServer;
 use ballista_scheduler::state::backend::{StateBackend, StateBackendClient};
diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs
index af1ce1c6..33dc3ac0 100644
--- a/ballista/rust/scheduler/src/planner.rs
+++ b/ballista/rust/scheduler/src/planner.rs
@@ -287,7 +287,8 @@ mod test {
     use datafusion::prelude::SessionContext;
     use std::ops::Deref;
 
-    use ballista_core::serde::protobuf::{LogicalPlanNode, PhysicalPlanNode};
+    use ballista_core::serde::protobuf::PhysicalPlanNode;
+    use datafusion_proto::protobuf::LogicalPlanNode;
     use std::sync::Arc;
     use uuid::Uuid;
 
diff --git a/ballista/rust/scheduler/src/scheduler_server/event_loop.rs b/ballista/rust/scheduler/src/scheduler_server/event_loop.rs
index c343743c..5ba830e8 100644
--- a/ballista/rust/scheduler/src/scheduler_server/event_loop.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/event_loop.rs
@@ -22,15 +22,15 @@ use async_trait::async_trait;
 use log::{debug, warn};
 
 use crate::scheduler_server::event::SchedulerServerEvent;
+use crate::scheduler_server::ExecutorsClient;
+use crate::state::task_scheduler::TaskScheduler;
+use crate::state::SchedulerState;
 use ballista_core::error::{BallistaError, Result};
 use ballista_core::event_loop::EventAction;
 use ballista_core::serde::protobuf::{LaunchTaskParams, TaskDefinition};
 use ballista_core::serde::scheduler::ExecutorDataChange;
-use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan};
-
-use crate::scheduler_server::ExecutorsClient;
-use crate::state::task_scheduler::TaskScheduler;
-use crate::state::SchedulerState;
+use ballista_core::serde::AsExecutionPlan;
+use datafusion_proto::logical_plan::AsLogicalPlan;
 
 pub(crate) struct SchedulerServerEventAction<
     T: 'static + AsLogicalPlan,
diff --git a/ballista/rust/scheduler/src/scheduler_server/external_scaler.rs b/ballista/rust/scheduler/src/scheduler_server/external_scaler.rs
index 4b3966df..1b8d42c2 100644
--- a/ballista/rust/scheduler/src/scheduler_server/external_scaler.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/external_scaler.rs
@@ -20,7 +20,8 @@ use crate::scheduler_server::externalscaler::{
     GetMetricsResponse, IsActiveResponse, MetricSpec, MetricValue, ScaledObjectRef,
 };
 use crate::scheduler_server::SchedulerServer;
-use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan};
+use ballista_core::serde::AsExecutionPlan;
+use datafusion_proto::logical_plan::AsLogicalPlan;
 use log::debug;
 use tonic::{Request, Response};
 
diff --git a/ballista/rust/scheduler/src/scheduler_server/grpc.rs b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
index 9216155d..01c2058b 100644
--- a/ballista/rust/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
@@ -15,6 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::scheduler_server::event::QueryStageSchedulerEvent;
+use crate::scheduler_server::{
+    create_datafusion_context, update_datafusion_context, SchedulerServer,
+};
+use crate::state::task_scheduler::TaskScheduler;
 use anyhow::Context;
 use ballista_core::config::{BallistaConfig, TaskSchedulingPolicy};
 use ballista_core::error::BallistaError;
@@ -32,12 +37,13 @@ use ballista_core::serde::protobuf::{
 use ballista_core::serde::scheduler::{
     ExecutorData, ExecutorDataChange, ExecutorMetadata,
 };
-use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan};
+use ballista_core::serde::AsExecutionPlan;
 use datafusion::datafusion_data_access::object_store::{
     local::LocalFileSystem, ObjectStore,
 };
 use datafusion::datasource::file_format::parquet::ParquetFormat;
 use datafusion::datasource::file_format::FileFormat;
+use datafusion_proto::logical_plan::AsLogicalPlan;
 use futures::TryStreamExt;
 use log::{debug, error, info, trace, warn};
 use rand::{distributions::Alphanumeric, thread_rng, Rng};
@@ -48,12 +54,6 @@ use std::time::Instant;
 use std::time::{SystemTime, UNIX_EPOCH};
 use tonic::{Request, Response, Status};
 
-use crate::scheduler_server::event::QueryStageSchedulerEvent;
-use crate::scheduler_server::{
-    create_datafusion_context, update_datafusion_context, SchedulerServer,
-};
-use crate::state::task_scheduler::TaskScheduler;
-
 #[tonic::async_trait]
 impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
     for SchedulerServer<T, U>
@@ -561,12 +561,13 @@ mod test {
     use crate::state::{backend::standalone::StandaloneClient, SchedulerState};
     use ballista_core::error::BallistaError;
     use ballista_core::serde::protobuf::{
-        executor_registration::OptionalHost, ExecutorRegistration, LogicalPlanNode,
-        PhysicalPlanNode, PollWorkParams,
+        executor_registration::OptionalHost, ExecutorRegistration, PhysicalPlanNode,
+        PollWorkParams,
     };
     use ballista_core::serde::scheduler::ExecutorSpecification;
     use ballista_core::serde::BallistaCodec;
     use datafusion::execution::context::default_session_builder;
+    use datafusion_proto::protobuf::LogicalPlanNode;
 
     use super::{SchedulerGrpc, SchedulerServer};
 
diff --git a/ballista/rust/scheduler/src/scheduler_server/mod.rs b/ballista/rust/scheduler/src/scheduler_server/mod.rs
index d61f570c..68dede14 100644
--- a/ballista/rust/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/mod.rs
@@ -15,27 +15,25 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::collections::HashMap;
-use std::sync::Arc;
-use std::time::{SystemTime, UNIX_EPOCH};
-
-use tokio::sync::RwLock;
-use tonic::transport::Channel;
-
+use crate::scheduler_server::event::{QueryStageSchedulerEvent, SchedulerServerEvent};
+use crate::scheduler_server::event_loop::SchedulerServerEventAction;
+use crate::scheduler_server::query_stage_scheduler::QueryStageScheduler;
+use crate::state::backend::StateBackendClient;
+use crate::state::SchedulerState;
 use ballista_core::config::{BallistaConfig, TaskSchedulingPolicy};
 use ballista_core::error::Result;
 use ballista_core::event_loop::EventLoop;
 use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient;
 use ballista_core::serde::protobuf::TaskStatus;
-use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan, BallistaCodec};
+use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
 use datafusion::execution::context::{default_session_builder, SessionState};
 use datafusion::prelude::{SessionConfig, SessionContext};
-
-use crate::scheduler_server::event::{QueryStageSchedulerEvent, SchedulerServerEvent};
-use crate::scheduler_server::event_loop::SchedulerServerEventAction;
-use crate::scheduler_server::query_stage_scheduler::QueryStageScheduler;
-use crate::state::backend::StateBackendClient;
-use crate::state::SchedulerState;
+use datafusion_proto::logical_plan::AsLogicalPlan;
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::{SystemTime, UNIX_EPOCH};
+use tokio::sync::RwLock;
+use tonic::transport::Channel;
 
 // include the generated protobuf source as a submodule
 #[allow(clippy::all)]
@@ -288,8 +286,7 @@ mod test {
     use ballista_core::error::{BallistaError, Result};
     use ballista_core::execution_plans::ShuffleWriterExec;
     use ballista_core::serde::protobuf::{
-        job_status, task_status, CompletedTask, LogicalPlanNode, PartitionId,
-        PhysicalPlanNode, TaskStatus,
+        job_status, task_status, CompletedTask, PartitionId, PhysicalPlanNode, TaskStatus,
     };
     use ballista_core::serde::scheduler::ExecutorData;
     use ballista_core::serde::BallistaCodec;
@@ -298,6 +295,7 @@ mod test {
     use datafusion::logical_plan::{col, sum, LogicalPlan};
     use datafusion::prelude::{SessionConfig, SessionContext};
     use datafusion::test_util::scan_empty;
+    use datafusion_proto::protobuf::LogicalPlanNode;
 
     use crate::scheduler_server::event::QueryStageSchedulerEvent;
     use crate::scheduler_server::SchedulerServer;
diff --git a/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs b/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
index 58e9f8f3..fe1c2d5d 100644
--- a/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -15,15 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::collections::{HashMap, HashSet};
-use std::sync::Arc;
-
+use crate::planner::{
+    find_unresolved_shuffles, remove_unresolved_shuffles, DistributedPlanner,
+};
+use crate::scheduler_server::event::{QueryStageSchedulerEvent, SchedulerServerEvent};
+use crate::state::SchedulerState;
 use async_recursion::async_recursion;
 use async_trait::async_trait;
-use log::{debug, error, info, warn};
-
 use ballista_core::error::{BallistaError, Result};
-
 use ballista_core::event_loop::{EventAction, EventSender};
 use ballista_core::execution_plans::UnresolvedShuffleExec;
 use ballista_core::serde::protobuf::{
@@ -31,14 +30,12 @@ use ballista_core::serde::protobuf::{
     JobStatus, RunningJob, TaskStatus,
 };
 use ballista_core::serde::scheduler::{ExecutorMetadata, PartitionStats};
-use ballista_core::serde::{protobuf, AsExecutionPlan, AsLogicalPlan};
+use ballista_core::serde::{protobuf, AsExecutionPlan};
 use datafusion::physical_plan::ExecutionPlan;
-
-use crate::planner::{
-    find_unresolved_shuffles, remove_unresolved_shuffles, DistributedPlanner,
-};
-use crate::scheduler_server::event::{QueryStageSchedulerEvent, SchedulerServerEvent};
-use crate::state::SchedulerState;
+use datafusion_proto::logical_plan::AsLogicalPlan;
+use log::{debug, error, info, warn};
+use std::collections::{HashMap, HashSet};
+use std::sync::Arc;
 
 pub(crate) struct QueryStageScheduler<
     T: 'static + AsLogicalPlan,
diff --git a/ballista/rust/scheduler/src/standalone.rs b/ballista/rust/scheduler/src/standalone.rs
index f333587e..4f358a46 100644
--- a/ballista/rust/scheduler/src/standalone.rs
+++ b/ballista/rust/scheduler/src/standalone.rs
@@ -15,12 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use ballista_core::serde::protobuf::{LogicalPlanNode, PhysicalPlanNode};
+use ballista_core::serde::protobuf::PhysicalPlanNode;
 use ballista_core::serde::BallistaCodec;
 use ballista_core::{
     error::Result, serde::protobuf::scheduler_grpc_server::SchedulerGrpcServer,
     BALLISTA_VERSION,
 };
+use datafusion_proto::protobuf::LogicalPlanNode;
 use log::info;
 use std::{net::SocketAddr, sync::Arc};
 use tokio::net::TcpListener;
diff --git a/ballista/rust/scheduler/src/state/mod.rs b/ballista/rust/scheduler/src/state/mod.rs
index 5f301346..de780fab 100644
--- a/ballista/rust/scheduler/src/state/mod.rs
+++ b/ballista/rust/scheduler/src/state/mod.rs
@@ -15,23 +15,20 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::collections::HashMap;
-use std::sync::Arc;
-use std::time::{Duration, SystemTime, UNIX_EPOCH};
-
-use datafusion::physical_plan::ExecutionPlan;
-
-use ballista_core::error::Result;
-
 use crate::scheduler_server::{SessionBuilder, SessionContextRegistry};
-use ballista_core::serde::protobuf::{ExecutorHeartbeat, JobStatus, KeyValuePair};
-use ballista_core::serde::scheduler::ExecutorMetadata;
-use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan, BallistaCodec};
-
 use crate::state::backend::StateBackendClient;
 use crate::state::executor_manager::ExecutorManager;
 use crate::state::persistent_state::PersistentSchedulerState;
 use crate::state::stage_manager::StageManager;
+use ballista_core::error::Result;
+use ballista_core::serde::protobuf::{ExecutorHeartbeat, JobStatus, KeyValuePair};
+use ballista_core::serde::scheduler::ExecutorMetadata;
+use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion_proto::logical_plan::AsLogicalPlan;
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::{Duration, SystemTime, UNIX_EPOCH};
 
 pub mod backend;
 mod executor_manager;
@@ -181,11 +178,12 @@ mod test {
 
     use ballista_core::error::BallistaError;
     use ballista_core::serde::protobuf::{
-        job_status, JobStatus, LogicalPlanNode, PhysicalPlanNode, QueuedJob,
+        job_status, JobStatus, PhysicalPlanNode, QueuedJob,
     };
     use ballista_core::serde::scheduler::{ExecutorMetadata, ExecutorSpecification};
     use ballista_core::serde::BallistaCodec;
     use datafusion::execution::context::default_session_builder;
+    use datafusion_proto::protobuf::LogicalPlanNode;
 
     use super::{backend::standalone::StandaloneClient, SchedulerState};
 
diff --git a/ballista/rust/scheduler/src/state/persistent_state.rs b/ballista/rust/scheduler/src/state/persistent_state.rs
index 428d523a..e9692482 100644
--- a/ballista/rust/scheduler/src/state/persistent_state.rs
+++ b/ballista/rust/scheduler/src/state/persistent_state.rs
@@ -15,7 +15,18 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::scheduler_server::{
+    create_datafusion_context, SessionBuilder, SessionContextRegistry,
+};
+use crate::state::backend::StateBackendClient;
+use crate::state::stage_manager::StageKey;
 use ballista_core::config::BallistaConfig;
+use ballista_core::error::{BallistaError, Result};
+use ballista_core::serde::protobuf::{JobSessionConfig, JobStatus, KeyValuePair};
+use ballista_core::serde::scheduler::ExecutorMetadata;
+use ballista_core::serde::{protobuf, AsExecutionPlan, BallistaCodec};
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion_proto::logical_plan::AsLogicalPlan;
 use log::{debug, error};
 use parking_lot::RwLock;
 use prost::Message;
@@ -24,19 +35,6 @@ use std::collections::HashMap;
 use std::ops::Deref;
 use std::sync::Arc;
 
-use ballista_core::error::{BallistaError, Result};
-
-use ballista_core::serde::protobuf::{JobSessionConfig, JobStatus, KeyValuePair};
-
-use crate::scheduler_server::{
-    create_datafusion_context, SessionBuilder, SessionContextRegistry,
-};
-use crate::state::backend::StateBackendClient;
-use crate::state::stage_manager::StageKey;
-use ballista_core::serde::scheduler::ExecutorMetadata;
-use ballista_core::serde::{protobuf, AsExecutionPlan, AsLogicalPlan, BallistaCodec};
-use datafusion::physical_plan::ExecutionPlan;
-
 #[derive(Clone)]
 pub(crate) struct PersistentSchedulerState<
     T: 'static + AsLogicalPlan,
@@ -408,13 +406,12 @@ mod test {
     use crate::state::persistent_state::PersistentSchedulerState;
 
     use ballista_core::serde::protobuf::job_status::Status;
-    use ballista_core::serde::protobuf::{
-        JobStatus, LogicalPlanNode, PhysicalPlanNode, QueuedJob,
-    };
+    use ballista_core::serde::protobuf::{JobStatus, PhysicalPlanNode, QueuedJob};
     use ballista_core::serde::BallistaCodec;
     use datafusion::execution::context::default_session_builder;
     use datafusion::logical_plan::LogicalPlanBuilder;
     use datafusion::prelude::SessionContext;
+    use datafusion_proto::protobuf::LogicalPlanNode;
 
     use std::sync::Arc;
 
diff --git a/ballista/rust/scheduler/src/state/task_scheduler.rs b/ballista/rust/scheduler/src/state/task_scheduler.rs
index 132fbcf1..0b40091e 100644
--- a/ballista/rust/scheduler/src/state/task_scheduler.rs
+++ b/ballista/rust/scheduler/src/state/task_scheduler.rs
@@ -26,7 +26,8 @@ use ballista_core::serde::protobuf::{
 };
 use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto;
 use ballista_core::serde::scheduler::{ExecutorData, PartitionId};
-use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan};
+use ballista_core::serde::AsExecutionPlan;
+use datafusion_proto::logical_plan::AsLogicalPlan;
 use log::{debug, info};
 
 #[async_trait]
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index a0352ae8..075633da 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -34,6 +34,7 @@ snmalloc = ["snmalloc-rs"]
 [dependencies]
 ballista = { path = "../ballista/rust/client" }
 datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "3c1c188e1476575f113a511789e398fdd5c009cd" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "3c1c188e1476575f113a511789e398fdd5c009cd" }
 env_logger = "0.9"
 futures = "0.3"
 mimalloc = { version = "0.1", optional = true, default-features = false }
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 0c266812..18b1b018 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -17,23 +17,13 @@
 
 //! Benchmark derived from TPC-H. This is not an official TPC-H benchmark.
 
-use futures::future::join_all;
-use rand::prelude::*;
-use std::ops::Div;
-use std::{
-    fs::{self, File},
-    io::Write,
-    iter::Iterator,
-    path::{Path, PathBuf},
-    sync::Arc,
-    time::{Instant, SystemTime},
-};
-
 use ballista::context::BallistaContext;
 use ballista::prelude::{
     BallistaConfig, BALLISTA_DEFAULT_BATCH_SIZE, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
 };
-
+use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
+use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
+use datafusion::datasource::listing::ListingTableUrl;
 use datafusion::datasource::{MemTable, TableProvider};
 use datafusion::error::{DataFusionError, Result};
 use datafusion::logical_plan::LogicalPlan;
@@ -54,11 +44,18 @@ use datafusion::{
     arrow::util::pretty,
     datasource::listing::{ListingOptions, ListingTable, ListingTableConfig},
 };
-
-use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
-use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
-use datafusion::datasource::listing::ListingTableUrl;
+use futures::future::join_all;
+use rand::prelude::*;
 use serde::Serialize;
+use std::ops::Div;
+use std::{
+    fs::{self, File},
+    io::Write,
+    iter::Iterator,
+    path::{Path, PathBuf},
+    sync::Arc,
+    time::{Instant, SystemTime},
+};
 use structopt::StructOpt;
 
 #[cfg(feature = "snmalloc")]
@@ -921,13 +918,12 @@ struct QueryResult {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use std::env;
-    use std::sync::Arc;
-
     use datafusion::arrow::array::*;
     use datafusion::arrow::util::display::array_value_to_string;
     use datafusion::logical_plan::Expr;
     use datafusion::logical_plan::Expr::Cast;
+    use std::env;
+    use std::sync::Arc;
 
     #[tokio::test]
     async fn q1() -> Result<()> {
@@ -1464,11 +1460,10 @@ mod tests {
 
     mod ballista_round_trip {
         use super::*;
-        use ballista_core::serde::{
-            protobuf, AsExecutionPlan, AsLogicalPlan, BallistaCodec,
-        };
+        use ballista_core::serde::{protobuf, AsExecutionPlan, BallistaCodec};
         use datafusion::datasource::listing::ListingTableUrl;
         use datafusion::physical_plan::ExecutionPlan;
+        use datafusion_proto::logical_plan::AsLogicalPlan;
         use std::ops::Deref;
 
         async fn round_trip_query(n: usize) -> Result<()> {
@@ -1477,7 +1472,7 @@ mod tests {
                 .with_batch_size(10);
             let ctx = SessionContext::with_config(config);
             let codec: BallistaCodec<
-                protobuf::LogicalPlanNode,
+                datafusion_proto::protobuf::LogicalPlanNode,
                 protobuf::PhysicalPlanNode,
             > = BallistaCodec::default();
 
@@ -1505,8 +1500,8 @@ mod tests {
             // test logical plan round trip
             let plans = create_logical_plans(&ctx, n)?;
             for plan in plans {
-                let proto: protobuf::LogicalPlanNode =
-                    protobuf::LogicalPlanNode::try_from_logical_plan(
+                let proto: datafusion_proto::protobuf::LogicalPlanNode =
+                    datafusion_proto::protobuf::LogicalPlanNode::try_from_logical_plan(
                         &plan,
                         codec.logical_extension_codec(),
                     )
@@ -1522,8 +1517,8 @@ mod tests {
 
                 // test optimized logical plan round trip
                 let plan = ctx.optimize(&plan)?;
-                let proto: protobuf::LogicalPlanNode =
-                    protobuf::LogicalPlanNode::try_from_logical_plan(
+                let proto: datafusion_proto::protobuf::LogicalPlanNode =
+                    datafusion_proto::protobuf::LogicalPlanNode::try_from_logical_plan(
                         &plan,
                         codec.logical_extension_codec(),
                     )