You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/02/27 14:02:44 UTC

[arrow-datafusion] branch master updated: Remove allow of unused imports (#1853)

This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new f6bbb62  Remove allow of unused imports (#1853)
f6bbb62 is described below

commit f6bbb629c680c57b6461d6200a1d291b1695aa1d
Author: Carol (Nichols || Goulding) <19...@users.noreply.github.com>
AuthorDate: Sun Feb 27 09:02:39 2022 -0500

    Remove allow of unused imports (#1853)
---
 ballista/rust/core/src/client.rs                   | 13 ++---
 ballista/rust/core/src/config.rs                   |  2 -
 .../core/src/execution_plans/distributed_query.rs  |  8 ++-
 .../core/src/execution_plans/shuffle_reader.rs     | 12 ++---
 .../core/src/execution_plans/shuffle_writer.rs     | 22 +++-----
 .../core/src/execution_plans/unresolved_shuffle.rs | 11 +---
 ballista/rust/core/src/lib.rs                      |  1 -
 .../rust/core/src/serde/logical_plan/from_proto.rs | 36 ++++---------
 ballista/rust/core/src/serde/logical_plan/mod.rs   | 23 ++++----
 .../rust/core/src/serde/logical_plan/to_proto.rs   | 33 +++---------
 ballista/rust/core/src/serde/mod.rs                | 12 ++---
 .../core/src/serde/physical_plan/from_proto.rs     | 61 +++++-----------------
 ballista/rust/core/src/serde/physical_plan/mod.rs  |  7 ++-
 .../rust/core/src/serde/physical_plan/to_proto.rs  | 46 ++++------------
 .../rust/core/src/serde/scheduler/from_proto.rs    |  9 +---
 ballista/rust/core/src/serde/scheduler/mod.rs      |  7 ++-
 ballista/rust/core/src/serde/scheduler/to_proto.rs |  4 +-
 ballista/rust/core/src/utils.rs                    | 26 +++------
 18 files changed, 89 insertions(+), 244 deletions(-)

diff --git a/ballista/rust/core/src/client.rs b/ballista/rust/core/src/client.rs
index aae4b2b..ccbaea8 100644
--- a/ballista/rust/core/src/client.rs
+++ b/ballista/rust/core/src/client.rs
@@ -19,7 +19,7 @@
 
 use parking_lot::Mutex;
 use std::sync::Arc;
-use std::{collections::HashMap, pin::Pin};
+
 use std::{
     convert::{TryFrom, TryInto},
     task::{Context, Poll},
@@ -27,27 +27,22 @@ use std::{
 
 use crate::error::{ballista_error, BallistaError, Result};
 use crate::serde::protobuf::{self};
-use crate::serde::scheduler::{
-    Action, ExecutePartition, ExecutePartitionResult, PartitionId, PartitionStats,
-};
+use crate::serde::scheduler::Action;
 
 use arrow_flight::utils::flight_data_to_arrow_batch;
 use arrow_flight::Ticket;
 use arrow_flight::{flight_service_client::FlightServiceClient, FlightData};
 use datafusion::arrow::{
-    array::{StringArray, StructArray},
     datatypes::{Schema, SchemaRef},
     error::{ArrowError, Result as ArrowResult},
     record_batch::RecordBatch,
 };
-use datafusion::physical_plan::common::collect;
-use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
-use datafusion::{logical_plan::LogicalPlan, physical_plan::RecordBatchStream};
+
+use datafusion::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
 use futures::{Stream, StreamExt};
 use log::debug;
 use prost::Message;
 use tonic::Streaming;
-use uuid::Uuid;
 
 /// Client for interacting with Ballista executors.
 #[derive(Clone)]
diff --git a/ballista/rust/core/src/config.rs b/ballista/rust/core/src/config.rs
index fa60231..8cdaf1f 100644
--- a/ballista/rust/core/src/config.rs
+++ b/ballista/rust/core/src/config.rs
@@ -22,12 +22,10 @@ use clap::ArgEnum;
 use core::fmt;
 use std::collections::HashMap;
 use std::result;
-use std::string::ParseError;
 
 use crate::error::{BallistaError, Result};
 
 use datafusion::arrow::datatypes::DataType;
-use log::warn;
 
 pub const BALLISTA_DEFAULT_SHUFFLE_PARTITIONS: &str = "ballista.shuffle.partitions";
 pub const BALLISTA_WITH_INFORMATION_SCHEMA: &str = "ballista.with_information_schema";
diff --git a/ballista/rust/core/src/execution_plans/distributed_query.rs b/ballista/rust/core/src/execution_plans/distributed_query.rs
index d6b3c3d..e4f5e7d 100644
--- a/ballista/rust/core/src/execution_plans/distributed_query.rs
+++ b/ballista/rust/core/src/execution_plans/distributed_query.rs
@@ -16,11 +16,10 @@
 // under the License.
 
 use std::any::Any;
-use std::convert::TryInto;
+
 use std::fmt::Debug;
 use std::marker::PhantomData;
-use std::marker::Send;
-use std::pin::Pin;
+
 use std::sync::Arc;
 use std::time::Duration;
 
@@ -38,8 +37,7 @@ use datafusion::error::{DataFusionError, Result};
 use datafusion::logical_plan::LogicalPlan;
 use datafusion::physical_plan::expressions::PhysicalSortExpr;
 use datafusion::physical_plan::{
-    DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
-    SendableRecordBatchStream, Statistics,
+    DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
 };
 
 use crate::serde::{AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec};
diff --git a/ballista/rust/core/src/execution_plans/shuffle_reader.rs b/ballista/rust/core/src/execution_plans/shuffle_reader.rs
index 7482c18..ce44be2 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_reader.rs
@@ -15,7 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::fmt::Formatter;
 use std::sync::Arc;
 use std::{any::Any, pin::Pin};
 
@@ -25,25 +24,22 @@ use crate::serde::scheduler::{PartitionLocation, PartitionStats};
 use crate::utils::WrappedStream;
 use async_trait::async_trait;
 use datafusion::arrow::datatypes::SchemaRef;
-use datafusion::arrow::error::Result as ArrowResult;
-use datafusion::arrow::record_batch::RecordBatch;
+
 use datafusion::execution::runtime_env::RuntimeEnv;
 use datafusion::physical_plan::expressions::PhysicalSortExpr;
 use datafusion::physical_plan::metrics::{
     ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
 };
 use datafusion::physical_plan::{
-    DisplayFormatType, ExecutionPlan, Metric, Partitioning, SendableRecordBatchStream,
-    Statistics,
+    DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
 };
 use datafusion::{
     error::{DataFusionError, Result},
     physical_plan::RecordBatchStream,
 };
-use futures::{future, Stream, StreamExt};
-use hashbrown::HashMap;
+use futures::{future, StreamExt};
+
 use log::info;
-use std::time::Instant;
 
 /// ShuffleReaderExec reads partitions that have already been materialized by a ShuffleWriterExec
 /// being executed by an executor
diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
index fbce065..b80fc84 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
@@ -21,19 +21,17 @@
 //! will use the ShuffleReaderExec to read these results.
 
 use datafusion::physical_plan::expressions::PhysicalSortExpr;
-use parking_lot::Mutex;
-use std::fs::File;
+
+use std::any::Any;
 use std::iter::Iterator;
 use std::path::PathBuf;
 use std::sync::Arc;
 use std::time::Instant;
-use std::{any::Any, pin::Pin};
 
-use crate::error::BallistaError;
 use crate::utils;
 
 use crate::serde::protobuf::ShuffleWritePartition;
-use crate::serde::scheduler::{PartitionLocation, PartitionStats};
+use crate::serde::scheduler::PartitionStats;
 use async_trait::async_trait;
 use datafusion::arrow::array::{
     Array, ArrayBuilder, ArrayRef, StringBuilder, StructBuilder, UInt32Builder,
@@ -41,8 +39,7 @@ use datafusion::arrow::array::{
 };
 use datafusion::arrow::compute::take;
 use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
-use datafusion::arrow::ipc::reader::FileReader;
-use datafusion::arrow::ipc::writer::FileWriter;
+
 use datafusion::arrow::record_batch::RecordBatch;
 use datafusion::error::{DataFusionError, Result};
 use datafusion::execution::runtime_env::RuntimeEnv;
@@ -52,16 +49,13 @@ use datafusion::physical_plan::memory::MemoryStream;
 use datafusion::physical_plan::metrics::{
     self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
 };
-use datafusion::physical_plan::repartition::RepartitionExec;
-use datafusion::physical_plan::Partitioning::RoundRobinBatch;
+
 use datafusion::physical_plan::{
-    DisplayFormatType, ExecutionPlan, Metric, Partitioning, RecordBatchStream,
-    SendableRecordBatchStream, Statistics,
+    DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
 };
 use futures::StreamExt;
-use hashbrown::HashMap;
+
 use log::{debug, info};
-use uuid::Uuid;
 
 /// ShuffleWriterExec represents a section of a query plan that has consistent partitioning and
 /// can be executed as one unit with each partition being executed in parallel. The output of each
@@ -452,7 +446,7 @@ mod tests {
     use datafusion::arrow::array::{StringArray, StructArray, UInt32Array, UInt64Array};
     use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
     use datafusion::physical_plan::expressions::Column;
-    use datafusion::physical_plan::limit::GlobalLimitExec;
+
     use datafusion::physical_plan::memory::MemoryExec;
     use tempfile::TempDir;
 
diff --git a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
index 90b7c79..418546a 100644
--- a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
+++ b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
@@ -15,24 +15,17 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::any::Any;
 use std::sync::Arc;
-use std::{any::Any, pin::Pin};
-
-use crate::serde::scheduler::PartitionLocation;
 
 use async_trait::async_trait;
 use datafusion::arrow::datatypes::SchemaRef;
+use datafusion::error::{DataFusionError, Result};
 use datafusion::execution::runtime_env::RuntimeEnv;
 use datafusion::physical_plan::expressions::PhysicalSortExpr;
 use datafusion::physical_plan::{
     DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
 };
-use datafusion::{
-    error::{DataFusionError, Result},
-    physical_plan::RecordBatchStream,
-};
-use log::info;
-use std::fmt::Formatter;
 
 /// UnresolvedShuffleExec represents a dependency on the results of a ShuffleWriterExec node which hasn't computed yet.
 ///
diff --git a/ballista/rust/core/src/lib.rs b/ballista/rust/core/src/lib.rs
index bc7be4f..8329f63 100644
--- a/ballista/rust/core/src/lib.rs
+++ b/ballista/rust/core/src/lib.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 #![doc = include_str!("../README.md")]
-#![allow(unused_imports)]
 pub const BALLISTA_VERSION: &str = env!("CARGO_PKG_VERSION");
 
 pub fn print_version() {
diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
index d4a28cf..068f1ef 100644
--- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
@@ -17,38 +17,26 @@
 
 //! Serde code to convert from protocol buffers to Rust data structures.
 
+use crate::convert_required;
 use crate::error::BallistaError;
-use crate::serde::{
-    from_proto_binary_op, proto_error, protobuf, str_to_byte, vec_to_array,
-};
-use crate::{convert_box_required, convert_required};
-use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
-use datafusion::datasource::file_format::avro::AvroFormat;
-use datafusion::datasource::file_format::csv::CsvFormat;
-use datafusion::datasource::file_format::parquet::ParquetFormat;
-use datafusion::datasource::file_format::FileFormat;
-use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig};
-use datafusion::datasource::object_store::local::LocalFileSystem;
-use datafusion::datasource::object_store::{FileMeta, SizedFile};
+use crate::serde::{from_proto_binary_op, proto_error, protobuf, vec_to_array};
+use datafusion::arrow::datatypes::{DataType, Field, Schema};
+
 use datafusion::logical_plan::window_frames::{
     WindowFrame, WindowFrameBound, WindowFrameUnits,
 };
 use datafusion::logical_plan::{
-    abs, acos, asin, atan, ceil, cos, digest, exp, floor, ln, log10, log2, round, signum,
-    sin, sqrt, tan, trunc, Column, CreateExternalTable, DFField, DFSchema, Expr,
-    JoinConstraint, JoinType, LogicalPlan, LogicalPlanBuilder, Operator,
+    abs, atan, ceil, cos, digest, exp, floor, ln, log10, log2, round, signum, sin, sqrt,
+    tan, trunc, Column, DFField, DFSchema, Expr,
 };
 use datafusion::physical_plan::aggregates::AggregateFunction;
 use datafusion::physical_plan::window_functions::BuiltInWindowFunction;
 use datafusion::prelude::*;
 use datafusion::scalar::ScalarValue;
-use protobuf::listing_table_scan_node::FileFormatType;
-use protobuf::logical_plan_node::LogicalPlanType;
-use protobuf::{logical_expr_node::ExprType, scalar_type};
+
 use std::{
     convert::{From, TryInto},
     sync::Arc,
-    unimplemented,
 };
 
 impl From<&protobuf::Column> for Column {
@@ -292,7 +280,6 @@ fn typechecked_scalar_value_conversion(
 impl TryInto<datafusion::scalar::ScalarValue> for &protobuf::scalar_value::Value {
     type Error = BallistaError;
     fn try_into(self) -> Result<datafusion::scalar::ScalarValue, Self::Error> {
-        use datafusion::scalar::ScalarValue;
         use protobuf::PrimitiveScalarType;
         let scalar = match self {
             protobuf::scalar_value::Value::BoolValue(v) => ScalarValue::Boolean(Some(*v)),
@@ -500,7 +487,6 @@ impl TryInto<DataType> for &protobuf::ScalarListType {
 impl TryInto<datafusion::scalar::ScalarValue> for protobuf::PrimitiveScalarType {
     type Error = BallistaError;
     fn try_into(self) -> Result<datafusion::scalar::ScalarValue, Self::Error> {
-        use datafusion::scalar::ScalarValue;
         Ok(match self {
             protobuf::PrimitiveScalarType::Null => {
                 return Err(proto_error("Untyped null is an invalid scalar value"))
@@ -657,7 +643,6 @@ impl TryInto<Expr> for &protobuf::LogicalExprNode {
         use datafusion::physical_plan::window_functions;
         use protobuf::logical_expr_node::ExprType;
         use protobuf::window_expr_node;
-        use protobuf::WindowExprNode;
 
         let expr_type = self
             .expr_type
@@ -671,7 +656,6 @@ impl TryInto<Expr> for &protobuf::LogicalExprNode {
             }),
             ExprType::Column(column) => Ok(Expr::Column(column.into())),
             ExprType::Literal(literal) => {
-                use datafusion::scalar::ScalarValue;
                 let scalar_value: datafusion::scalar::ScalarValue = literal.try_into()?;
                 Ok(Expr::Literal(scalar_value))
             }
@@ -972,11 +956,9 @@ impl TryInto<Field> for &protobuf::Field {
     }
 }
 
-use crate::serde::protobuf::ColumnStats;
-use datafusion::physical_plan::{aggregates, windows};
 use datafusion::prelude::{
-    array, date_part, date_trunc, length, lower, ltrim, md5, rtrim, sha224, sha256,
-    sha384, sha512, trim, upper,
+    date_part, date_trunc, lower, ltrim, rtrim, sha224, sha256, sha384, sha512, trim,
+    upper,
 };
 use std::convert::TryFrom;
 
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs
index 3166a48..f6e4627 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -28,24 +28,21 @@ use datafusion::datasource::file_format::csv::CsvFormat;
 use datafusion::datasource::file_format::parquet::ParquetFormat;
 use datafusion::datasource::file_format::FileFormat;
 use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig};
-use datafusion::datasource::object_store::local::LocalFileSystem;
-use datafusion::logical_plan::plan::Extension;
+
 use datafusion::logical_plan::plan::{
     Aggregate, EmptyRelation, Filter, Join, Projection, Sort, Window,
 };
 use datafusion::logical_plan::{
-    exprlist_to_fields,
-    window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits},
-    Column, CreateExternalTable, CrossJoin, Expr, JoinConstraint, JoinType, Limit,
-    LogicalPlan, LogicalPlanBuilder, Repartition, TableScan, Values,
+    Column, CreateExternalTable, CrossJoin, Expr, JoinConstraint, Limit, LogicalPlan,
+    LogicalPlanBuilder, Repartition, TableScan, Values,
 };
 use datafusion::prelude::ExecutionContext;
-use log::error;
-use prost::bytes::{Buf, BufMut};
+
+use prost::bytes::BufMut;
 use prost::Message;
 use protobuf::listing_table_scan_node::FileFormatType;
 use protobuf::logical_plan_node::LogicalPlanType;
-use protobuf::{logical_expr_node::ExprType, scalar_type, LogicalPlanNode};
+use protobuf::LogicalPlanNode;
 use std::convert::TryInto;
 use std::sync::Arc;
 
@@ -855,22 +852,20 @@ mod roundtrip_tests {
     use datafusion::datasource::object_store::{
         FileMetaStream, ListEntryStream, ObjectReader, ObjectStore, SizedFile,
     };
-    use datafusion::datasource::TableProvider;
     use datafusion::error::DataFusionError;
     use datafusion::{
         arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit, UnionMode},
         datasource::object_store::local::LocalFileSystem,
         logical_plan::{
-            col, CreateExternalTable, Expr, LogicalPlan, LogicalPlanBuilder,
-            Partitioning, Repartition, ToDFSchema,
+            col, CreateExternalTable, Expr, LogicalPlan, LogicalPlanBuilder, Repartition,
+            ToDFSchema,
         },
         physical_plan::{aggregates, functions::BuiltinScalarFunction::Sqrt},
         prelude::*,
         scalar::ScalarValue,
         sql::parser::FileType,
     };
-    use protobuf::arrow_type;
-    use sqlparser::test_utils::table;
+
     use std::{convert::TryInto, sync::Arc};
 
     #[derive(Debug)]
diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index 9cb8c41..99a5488 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -20,40 +20,23 @@
 //! processes.
 
 use super::super::proto_error;
-use crate::serde::{byte_to_string, protobuf, BallistaError};
+use crate::serde::{protobuf, BallistaError};
 use datafusion::arrow::datatypes::{
     DataType, Field, IntervalUnit, Schema, SchemaRef, TimeUnit, UnionMode,
 };
-use datafusion::datasource::file_format::avro::AvroFormat;
-use datafusion::datasource::file_format::csv::CsvFormat;
-use datafusion::datasource::TableProvider;
 
-use datafusion::datasource::file_format::parquet::ParquetFormat;
-use datafusion::datasource::listing::ListingTable;
-use datafusion::logical_plan::plan::{
-    Aggregate, EmptyRelation, Filter, Join, Projection, Sort, Window,
-};
 use datafusion::logical_plan::{
-    exprlist_to_fields,
     window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits},
-    Column, CreateExternalTable, CrossJoin, Expr, JoinConstraint, JoinType, Limit,
-    LogicalPlan, Repartition, TableScan, Values,
+    Column, Expr,
 };
 use datafusion::physical_plan::aggregates::AggregateFunction;
 use datafusion::physical_plan::functions::BuiltinScalarFunction;
 use datafusion::physical_plan::window_functions::{
     BuiltInWindowFunction, WindowFunction,
 };
-use datafusion::physical_plan::{ColumnStatistics, Statistics};
-use protobuf::listing_table_scan_node::FileFormatType;
-use protobuf::{
-    arrow_type, logical_expr_node::ExprType, scalar_type, DateUnit, PrimitiveScalarType,
-    ScalarListValue, ScalarType,
-};
-use std::{
-    boxed,
-    convert::{TryFrom, TryInto},
-};
+
+use protobuf::{logical_expr_node::ExprType, scalar_type};
+use std::convert::{TryFrom, TryInto};
 
 impl protobuf::IntervalUnit {
     pub fn from_arrow_interval_unit(interval_unit: &IntervalUnit) -> Self {
@@ -179,7 +162,7 @@ impl TryInto<DataType> for &Box<protobuf::List> {
 impl From<&DataType> for protobuf::arrow_type::ArrowTypeEnum {
     fn from(val: &DataType) -> protobuf::arrow_type::ArrowTypeEnum {
         use protobuf::arrow_type::ArrowTypeEnum;
-        use protobuf::ArrowType;
+
         use protobuf::EmptyMessage;
         match val {
             DataType::Null => ArrowTypeEnum::None(EmptyMessage {}),
@@ -301,7 +284,7 @@ fn is_valid_scalar_type_no_list_check(datatype: &DataType) -> bool {
 impl TryFrom<&DataType> for protobuf::scalar_type::Datatype {
     type Error = BallistaError;
     fn try_from(val: &DataType) -> Result<Self, Self::Error> {
-        use protobuf::{List, PrimitiveScalarType};
+        use protobuf::PrimitiveScalarType;
         let scalar_value = match val {
             DataType::Boolean => scalar_type::Datatype::Scalar(PrimitiveScalarType::Bool as i32),
             DataType::Int8 => scalar_type::Datatype::Scalar(PrimitiveScalarType::Int8 as i32),
@@ -635,8 +618,6 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
     type Error = BallistaError;
 
     fn try_into(self) -> Result<protobuf::LogicalExprNode, Self::Error> {
-        use datafusion::scalar::ScalarValue;
-        use protobuf::scalar_value::Value;
         match self {
             Expr::Column(c) => {
                 let expr = protobuf::LogicalExprNode {
diff --git a/ballista/rust/core/src/serde/mod.rs b/ballista/rust/core/src/serde/mod.rs
index 47d69eb..25a8b28 100644
--- a/ballista/rust/core/src/serde/mod.rs
+++ b/ballista/rust/core/src/serde/mod.rs
@@ -18,16 +18,14 @@
 //! This crate contains code generated from the Ballista Protocol Buffer Definition as well
 //! as convenience code for interacting with the generated code.
 
-use prost::bytes::{Buf, BufMut};
+use prost::bytes::BufMut;
 use std::fmt::Debug;
 use std::marker::PhantomData;
 use std::sync::Arc;
 use std::{convert::TryInto, io::Cursor};
 
 use datafusion::arrow::datatypes::{IntervalUnit, UnionMode};
-use datafusion::logical_plan::{
-    JoinConstraint, JoinType, LogicalPlan, Operator, UserDefinedLogicalNode,
-};
+use datafusion::logical_plan::{JoinConstraint, JoinType, LogicalPlan, Operator};
 use datafusion::physical_plan::aggregates::AggregateFunction;
 use datafusion::physical_plan::window_functions::BuiltInWindowFunction;
 
@@ -600,7 +598,7 @@ mod tests {
     use datafusion::prelude::{CsvReadOptions, ExecutionConfig, ExecutionContext};
     use prost::Message;
     use std::any::Any;
-    use std::collections::BTreeMap;
+
     use std::convert::TryInto;
     use std::fmt;
     use std::fmt::{Debug, Formatter};
@@ -608,7 +606,6 @@ mod tests {
 
     pub mod proto {
         use crate::serde::protobuf;
-        use prost::Message;
 
         #[derive(Clone, PartialEq, ::prost::Message)]
         pub struct TopKPlanProto {
@@ -629,8 +626,7 @@ mod tests {
     use crate::error::BallistaError;
     use crate::serde::protobuf::{LogicalPlanNode, PhysicalPlanNode};
     use crate::serde::{
-        AsExecutionPlan, AsLogicalPlan, BallistaCodec, LogicalExtensionCodec,
-        PhysicalExtensionCodec,
+        AsExecutionPlan, AsLogicalPlan, LogicalExtensionCodec, PhysicalExtensionCodec,
     };
     use proto::{TopKExecProto, TopKPlanProto};
 
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index 93629a3..9356e4b 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -17,24 +17,16 @@
 
 //! Serde code to convert from protocol buffers to Rust data structures.
 
-use std::collections::HashMap;
 use std::convert::{TryFrom, TryInto};
 use std::sync::Arc;
 
 use crate::error::BallistaError;
-use crate::execution_plans::{
-    ShuffleReaderExec, ShuffleWriterExec, UnresolvedShuffleExec,
-};
-use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
-use crate::serde::protobuf::ShuffleReaderPartition;
-use crate::serde::scheduler::PartitionLocation;
-use crate::serde::{from_proto_binary_op, proto_error, protobuf, str_to_byte};
-use crate::{convert_box_required, convert_required, into_required};
+
+use crate::serde::{from_proto_binary_op, proto_error, protobuf};
+use crate::{convert_box_required, convert_required};
 use chrono::{TimeZone, Utc};
-use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef};
-use datafusion::catalog::catalog::{
-    CatalogList, CatalogProvider, MemoryCatalogList, MemoryCatalogProvider,
-};
+
+use datafusion::catalog::catalog::{CatalogList, MemoryCatalogList};
 use datafusion::datasource::object_store::local::LocalFileSystem;
 use datafusion::datasource::object_store::{FileMeta, ObjectStoreRegistry, SizedFile};
 use datafusion::datasource::PartitionedFile;
@@ -42,47 +34,22 @@ use datafusion::execution::context::{
     ExecutionConfig, ExecutionContextState, ExecutionProps,
 };
 use datafusion::execution::runtime_env::RuntimeEnv;
-use datafusion::logical_plan::{
-    window_frames::WindowFrame, DFSchema, Expr, JoinConstraint, JoinType,
-};
-use datafusion::physical_plan::aggregates::{create_aggregate_expr, AggregateFunction};
-use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
-use datafusion::physical_plan::file_format::{
-    AvroExec, CsvExec, FileScanConfig, ParquetExec,
-};
-use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
-use datafusion::physical_plan::hash_join::PartitionMode;
-use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
-use datafusion::physical_plan::planner::DefaultPhysicalPlanner;
-use datafusion::physical_plan::sorts::sort::{SortExec, SortOptions};
-use datafusion::physical_plan::window_functions::{
-    BuiltInWindowFunction, WindowFunction,
-};
-use datafusion::physical_plan::windows::{create_window_expr, WindowAggExec};
+
+use datafusion::physical_plan::file_format::FileScanConfig;
+
+use datafusion::physical_plan::window_functions::WindowFunction;
+
 use datafusion::physical_plan::{
-    coalesce_batches::CoalesceBatchesExec,
-    cross_join::CrossJoinExec,
-    empty::EmptyExec,
     expressions::{
-        col, Avg, BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr,
-        IsNullExpr, Literal, NegativeExpr, NotExpr, PhysicalSortExpr, TryCastExpr,
-        DEFAULT_DATAFUSION_CAST_OPTIONS,
+        BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr,
+        Literal, NegativeExpr, NotExpr, TryCastExpr, DEFAULT_DATAFUSION_CAST_OPTIONS,
     },
-    filter::FilterExec,
     functions::{self, BuiltinScalarFunction, ScalarFunctionExpr},
-    hash_join::HashJoinExec,
-    limit::{GlobalLimitExec, LocalLimitExec},
-    projection::ProjectionExec,
-    repartition::RepartitionExec,
     Partitioning,
 };
-use datafusion::physical_plan::{
-    AggregateExpr, ColumnStatistics, ExecutionPlan, PhysicalExpr, Statistics, WindowExpr,
-};
-use datafusion::prelude::CsvReadOptions;
-use log::debug;
+use datafusion::physical_plan::{ColumnStatistics, PhysicalExpr, Statistics};
+
 use protobuf::physical_expr_node::ExprType;
-use protobuf::physical_plan_node::PhysicalPlanType;
 
 impl From<&protobuf::PhysicalColumn> for Column {
     fn from(c: &protobuf::PhysicalColumn) -> Column {
diff --git a/ballista/rust/core/src/serde/physical_plan/mod.rs b/ballista/rust/core/src/serde/physical_plan/mod.rs
index 50aa317..5ef6992 100644
--- a/ballista/rust/core/src/serde/physical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/physical_plan/mod.rs
@@ -23,7 +23,7 @@ use crate::serde::physical_plan::from_proto::parse_protobuf_hash_partitioning;
 use crate::serde::protobuf::physical_expr_node::ExprType;
 use crate::serde::protobuf::physical_plan_node::PhysicalPlanType;
 use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
-use crate::serde::protobuf::ShuffleReaderPartition;
+
 use crate::serde::protobuf::{PhysicalExtensionNode, PhysicalPlanNode};
 use crate::serde::scheduler::PartitionLocation;
 use crate::serde::{
@@ -926,7 +926,7 @@ macro_rules! into_physical_plan {
 
 #[cfg(test)]
 mod roundtrip_tests {
-    use std::{convert::TryInto, sync::Arc};
+    use std::sync::Arc;
 
     use crate::serde::{AsExecutionPlan, BallistaCodec};
     use datafusion::physical_plan::sorts::sort::SortExec;
@@ -945,8 +945,7 @@ mod roundtrip_tests {
             hash_aggregate::{AggregateMode, HashAggregateExec},
             hash_join::{HashJoinExec, PartitionMode},
             limit::{GlobalLimitExec, LocalLimitExec},
-            AggregateExpr, ColumnarValue, Distribution, ExecutionPlan, Partitioning,
-            PhysicalExpr,
+            AggregateExpr, ExecutionPlan, Partitioning, PhysicalExpr,
         },
         scalar::ScalarValue,
     };
diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
index e6f4b59..e3b60e0 100644
--- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
@@ -26,54 +26,26 @@ use std::{
     sync::Arc,
 };
 
-use datafusion::physical_plan::hash_join::{HashJoinExec, PartitionMode};
-use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
-use datafusion::physical_plan::projection::ProjectionExec;
-use datafusion::physical_plan::sorts::sort::SortExec;
-use datafusion::physical_plan::{cross_join::CrossJoinExec, ColumnStatistics};
+use datafusion::physical_plan::expressions::{CastExpr, TryCastExpr};
+use datafusion::physical_plan::ColumnStatistics;
 use datafusion::physical_plan::{
     expressions::{
         CaseExpr, InListExpr, IsNotNullExpr, IsNullExpr, NegativeExpr, NotExpr,
     },
     Statistics,
 };
-use datafusion::physical_plan::{
-    expressions::{CastExpr, TryCastExpr},
-    file_format::ParquetExec,
-};
-use datafusion::physical_plan::{file_format::AvroExec, filter::FilterExec};
-use datafusion::physical_plan::{
-    file_format::FileScanConfig, hash_aggregate::AggregateMode,
-};
-use datafusion::{
-    datasource::PartitionedFile, physical_plan::coalesce_batches::CoalesceBatchesExec,
-};
-use datafusion::{logical_plan::JoinType, physical_plan::file_format::CsvExec};
-use datafusion::{
-    physical_plan::expressions::{Count, Literal},
-    scalar::ScalarValue,
-};
 
-use datafusion::physical_plan::{
-    empty::EmptyExec,
-    expressions::{Avg, BinaryExpr, Column, Max, Min, Sum},
-    Partitioning,
-};
-use datafusion::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr};
+use datafusion::datasource::PartitionedFile;
+use datafusion::physical_plan::file_format::FileScanConfig;
+
+use datafusion::physical_plan::expressions::{Count, Literal};
 
-use datafusion::physical_plan::hash_aggregate::HashAggregateExec;
-use protobuf::physical_plan_node::PhysicalPlanType;
+use datafusion::physical_plan::expressions::{Avg, BinaryExpr, Column, Max, Min, Sum};
+use datafusion::physical_plan::{AggregateExpr, PhysicalExpr};
 
-use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
-use crate::serde::scheduler::PartitionLocation;
 use crate::serde::{protobuf, BallistaError};
-use crate::{
-    execution_plans::{ShuffleReaderExec, ShuffleWriterExec, UnresolvedShuffleExec},
-    serde::byte_to_string,
-};
-use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+
 use datafusion::physical_plan::functions::{BuiltinScalarFunction, ScalarFunctionExpr};
-use datafusion::physical_plan::repartition::RepartitionExec;
 
 impl TryInto<protobuf::PhysicalExprNode> for Arc<dyn AggregateExpr> {
     type Error = BallistaError;
diff --git a/ballista/rust/core/src/serde/scheduler/from_proto.rs b/ballista/rust/core/src/serde/scheduler/from_proto.rs
index 8d4e279..b401f1f 100644
--- a/ballista/rust/core/src/serde/scheduler/from_proto.rs
+++ b/ballista/rust/core/src/serde/scheduler/from_proto.rs
@@ -15,17 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::{collections::HashMap, convert::TryInto};
+use std::convert::TryInto;
 
 use crate::error::BallistaError;
 use crate::serde::protobuf;
 use crate::serde::protobuf::action::ActionType;
-use crate::serde::scheduler::{
-    Action, ExecutePartition, PartitionId, PartitionLocation, PartitionStats,
-};
-
-use datafusion::logical_plan::LogicalPlan;
-use uuid::Uuid;
+use crate::serde::scheduler::{Action, PartitionId, PartitionLocation, PartitionStats};
 
 impl TryInto<Action> for protobuf::Action {
     type Error = BallistaError;
diff --git a/ballista/rust/core/src/serde/scheduler/mod.rs b/ballista/rust/core/src/serde/scheduler/mod.rs
index a03811c..c304382 100644
--- a/ballista/rust/core/src/serde/scheduler/mod.rs
+++ b/ballista/rust/core/src/serde/scheduler/mod.rs
@@ -18,14 +18,13 @@
 use std::{collections::HashMap, fmt, sync::Arc};
 
 use datafusion::arrow::array::{
-    ArrayBuilder, ArrayRef, StructArray, StructBuilder, UInt64Array, UInt64Builder,
+    ArrayBuilder, StructArray, StructBuilder, UInt64Array, UInt64Builder,
 };
-use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
-use datafusion::logical_plan::LogicalPlan;
+use datafusion::arrow::datatypes::{DataType, Field};
+
 use datafusion::physical_plan::ExecutionPlan;
 use datafusion::physical_plan::Partitioning;
 use serde::Serialize;
-use uuid::Uuid;
 
 use super::protobuf;
 use crate::error::BallistaError;
diff --git a/ballista/rust/core/src/serde/scheduler/to_proto.rs b/ballista/rust/core/src/serde/scheduler/to_proto.rs
index 71a02d3..4c1c5d1 100644
--- a/ballista/rust/core/src/serde/scheduler/to_proto.rs
+++ b/ballista/rust/core/src/serde/scheduler/to_proto.rs
@@ -20,9 +20,7 @@ use std::convert::TryInto;
 use crate::error::BallistaError;
 use crate::serde::protobuf;
 use crate::serde::protobuf::action::ActionType;
-use crate::serde::scheduler::{
-    Action, ExecutePartition, PartitionId, PartitionLocation, PartitionStats,
-};
+use crate::serde::scheduler::{Action, PartitionId, PartitionLocation, PartitionStats};
 use datafusion::physical_plan::Partitioning;
 
 impl TryInto<protobuf::Action> for Action {
diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index fae3826..b612ddb 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -15,10 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::collections::HashMap;
 use std::io::{BufWriter, Write};
 use std::marker::PhantomData;
-use std::ops::Deref;
+
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
 use std::{fs::File, pin::Pin};
@@ -35,38 +34,27 @@ use async_trait::async_trait;
 use datafusion::arrow::datatypes::Schema;
 use datafusion::arrow::error::Result as ArrowResult;
 use datafusion::arrow::{
-    array::{
-        ArrayBuilder, ArrayRef, StructArray, StructBuilder, UInt64Array, UInt64Builder,
-    },
-    datatypes::{DataType, Field, SchemaRef},
-    ipc::reader::FileReader,
-    ipc::writer::FileWriter,
-    record_batch::RecordBatch,
+    datatypes::SchemaRef, ipc::writer::FileWriter, record_batch::RecordBatch,
 };
 use datafusion::error::DataFusionError;
 use datafusion::execution::context::{
     ExecutionConfig, ExecutionContext, ExecutionContextState, QueryPlanner,
 };
-use datafusion::logical_plan::{LogicalPlan, Operator, TableScan};
-use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches;
-use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
-use datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule;
+use datafusion::logical_plan::LogicalPlan;
+
 use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
 use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
 use datafusion::physical_plan::common::batch_byte_size;
 use datafusion::physical_plan::empty::EmptyExec;
-use datafusion::physical_plan::expressions::{BinaryExpr, Column, Literal};
+
 use datafusion::physical_plan::file_format::{CsvExec, ParquetExec};
 use datafusion::physical_plan::filter::FilterExec;
 use datafusion::physical_plan::hash_aggregate::HashAggregateExec;
 use datafusion::physical_plan::hash_join::HashJoinExec;
 use datafusion::physical_plan::projection::ProjectionExec;
 use datafusion::physical_plan::sorts::sort::SortExec;
-use datafusion::physical_plan::{
-    metrics, AggregateExpr, ExecutionPlan, Metric, PhysicalExpr, RecordBatchStream,
-};
-use futures::{future, Stream, StreamExt};
-use std::time::Instant;
+use datafusion::physical_plan::{metrics, ExecutionPlan, RecordBatchStream};
+use futures::{Stream, StreamExt};
 
 /// Stream data to disk in Arrow IPC format