You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/02/27 14:02:44 UTC
[arrow-datafusion] branch master updated: Remove allow of unused imports (#1853)
This is an automated email from the ASF dual-hosted git repository.
jiayuliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new f6bbb62 Remove allow of unused imports (#1853)
f6bbb62 is described below
commit f6bbb629c680c57b6461d6200a1d291b1695aa1d
Author: Carol (Nichols || Goulding) <19...@users.noreply.github.com>
AuthorDate: Sun Feb 27 09:02:39 2022 -0500
Remove allow of unused imports (#1853)
---
ballista/rust/core/src/client.rs | 13 ++---
ballista/rust/core/src/config.rs | 2 -
.../core/src/execution_plans/distributed_query.rs | 8 ++-
.../core/src/execution_plans/shuffle_reader.rs | 12 ++---
.../core/src/execution_plans/shuffle_writer.rs | 22 +++-----
.../core/src/execution_plans/unresolved_shuffle.rs | 11 +---
ballista/rust/core/src/lib.rs | 1 -
.../rust/core/src/serde/logical_plan/from_proto.rs | 36 ++++---------
ballista/rust/core/src/serde/logical_plan/mod.rs | 23 ++++----
.../rust/core/src/serde/logical_plan/to_proto.rs | 33 +++---------
ballista/rust/core/src/serde/mod.rs | 12 ++---
.../core/src/serde/physical_plan/from_proto.rs | 61 +++++-----------------
ballista/rust/core/src/serde/physical_plan/mod.rs | 7 ++-
.../rust/core/src/serde/physical_plan/to_proto.rs | 46 ++++------------
.../rust/core/src/serde/scheduler/from_proto.rs | 9 +---
ballista/rust/core/src/serde/scheduler/mod.rs | 7 ++-
ballista/rust/core/src/serde/scheduler/to_proto.rs | 4 +-
ballista/rust/core/src/utils.rs | 26 +++------
18 files changed, 89 insertions(+), 244 deletions(-)
diff --git a/ballista/rust/core/src/client.rs b/ballista/rust/core/src/client.rs
index aae4b2b..ccbaea8 100644
--- a/ballista/rust/core/src/client.rs
+++ b/ballista/rust/core/src/client.rs
@@ -19,7 +19,7 @@
use parking_lot::Mutex;
use std::sync::Arc;
-use std::{collections::HashMap, pin::Pin};
+
use std::{
convert::{TryFrom, TryInto},
task::{Context, Poll},
@@ -27,27 +27,22 @@ use std::{
use crate::error::{ballista_error, BallistaError, Result};
use crate::serde::protobuf::{self};
-use crate::serde::scheduler::{
- Action, ExecutePartition, ExecutePartitionResult, PartitionId, PartitionStats,
-};
+use crate::serde::scheduler::Action;
use arrow_flight::utils::flight_data_to_arrow_batch;
use arrow_flight::Ticket;
use arrow_flight::{flight_service_client::FlightServiceClient, FlightData};
use datafusion::arrow::{
- array::{StringArray, StructArray},
datatypes::{Schema, SchemaRef},
error::{ArrowError, Result as ArrowResult},
record_batch::RecordBatch,
};
-use datafusion::physical_plan::common::collect;
-use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
-use datafusion::{logical_plan::LogicalPlan, physical_plan::RecordBatchStream};
+
+use datafusion::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
use futures::{Stream, StreamExt};
use log::debug;
use prost::Message;
use tonic::Streaming;
-use uuid::Uuid;
/// Client for interacting with Ballista executors.
#[derive(Clone)]
diff --git a/ballista/rust/core/src/config.rs b/ballista/rust/core/src/config.rs
index fa60231..8cdaf1f 100644
--- a/ballista/rust/core/src/config.rs
+++ b/ballista/rust/core/src/config.rs
@@ -22,12 +22,10 @@ use clap::ArgEnum;
use core::fmt;
use std::collections::HashMap;
use std::result;
-use std::string::ParseError;
use crate::error::{BallistaError, Result};
use datafusion::arrow::datatypes::DataType;
-use log::warn;
pub const BALLISTA_DEFAULT_SHUFFLE_PARTITIONS: &str = "ballista.shuffle.partitions";
pub const BALLISTA_WITH_INFORMATION_SCHEMA: &str = "ballista.with_information_schema";
diff --git a/ballista/rust/core/src/execution_plans/distributed_query.rs b/ballista/rust/core/src/execution_plans/distributed_query.rs
index d6b3c3d..e4f5e7d 100644
--- a/ballista/rust/core/src/execution_plans/distributed_query.rs
+++ b/ballista/rust/core/src/execution_plans/distributed_query.rs
@@ -16,11 +16,10 @@
// under the License.
use std::any::Any;
-use std::convert::TryInto;
+
use std::fmt::Debug;
use std::marker::PhantomData;
-use std::marker::Send;
-use std::pin::Pin;
+
use std::sync::Arc;
use std::time::Duration;
@@ -38,8 +37,7 @@ use datafusion::error::{DataFusionError, Result};
use datafusion::logical_plan::LogicalPlan;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::{
- DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
- SendableRecordBatchStream, Statistics,
+ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
};
use crate::serde::{AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec};
diff --git a/ballista/rust/core/src/execution_plans/shuffle_reader.rs b/ballista/rust/core/src/execution_plans/shuffle_reader.rs
index 7482c18..ce44be2 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_reader.rs
@@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-use std::fmt::Formatter;
use std::sync::Arc;
use std::{any::Any, pin::Pin};
@@ -25,25 +24,22 @@ use crate::serde::scheduler::{PartitionLocation, PartitionStats};
use crate::utils::WrappedStream;
use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
-use datafusion::arrow::error::Result as ArrowResult;
-use datafusion::arrow::record_batch::RecordBatch;
+
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{
ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
};
use datafusion::physical_plan::{
- DisplayFormatType, ExecutionPlan, Metric, Partitioning, SendableRecordBatchStream,
- Statistics,
+ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
};
use datafusion::{
error::{DataFusionError, Result},
physical_plan::RecordBatchStream,
};
-use futures::{future, Stream, StreamExt};
-use hashbrown::HashMap;
+use futures::{future, StreamExt};
+
use log::info;
-use std::time::Instant;
/// ShuffleReaderExec reads partitions that have already been materialized by a ShuffleWriterExec
/// being executed by an executor
diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
index fbce065..b80fc84 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
@@ -21,19 +21,17 @@
//! will use the ShuffleReaderExec to read these results.
use datafusion::physical_plan::expressions::PhysicalSortExpr;
-use parking_lot::Mutex;
-use std::fs::File;
+
+use std::any::Any;
use std::iter::Iterator;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
-use std::{any::Any, pin::Pin};
-use crate::error::BallistaError;
use crate::utils;
use crate::serde::protobuf::ShuffleWritePartition;
-use crate::serde::scheduler::{PartitionLocation, PartitionStats};
+use crate::serde::scheduler::PartitionStats;
use async_trait::async_trait;
use datafusion::arrow::array::{
Array, ArrayBuilder, ArrayRef, StringBuilder, StructBuilder, UInt32Builder,
@@ -41,8 +39,7 @@ use datafusion::arrow::array::{
};
use datafusion::arrow::compute::take;
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
-use datafusion::arrow::ipc::reader::FileReader;
-use datafusion::arrow::ipc::writer::FileWriter;
+
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::runtime_env::RuntimeEnv;
@@ -52,16 +49,13 @@ use datafusion::physical_plan::memory::MemoryStream;
use datafusion::physical_plan::metrics::{
self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
};
-use datafusion::physical_plan::repartition::RepartitionExec;
-use datafusion::physical_plan::Partitioning::RoundRobinBatch;
+
use datafusion::physical_plan::{
- DisplayFormatType, ExecutionPlan, Metric, Partitioning, RecordBatchStream,
- SendableRecordBatchStream, Statistics,
+ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
};
use futures::StreamExt;
-use hashbrown::HashMap;
+
use log::{debug, info};
-use uuid::Uuid;
/// ShuffleWriterExec represents a section of a query plan that has consistent partitioning and
/// can be executed as one unit with each partition being executed in parallel. The output of each
@@ -452,7 +446,7 @@ mod tests {
use datafusion::arrow::array::{StringArray, StructArray, UInt32Array, UInt64Array};
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::expressions::Column;
- use datafusion::physical_plan::limit::GlobalLimitExec;
+
use datafusion::physical_plan::memory::MemoryExec;
use tempfile::TempDir;
diff --git a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
index 90b7c79..418546a 100644
--- a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
+++ b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
@@ -15,24 +15,17 @@
// specific language governing permissions and limitations
// under the License.
+use std::any::Any;
use std::sync::Arc;
-use std::{any::Any, pin::Pin};
-
-use crate::serde::scheduler::PartitionLocation;
use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
+use datafusion::error::{DataFusionError, Result};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
};
-use datafusion::{
- error::{DataFusionError, Result},
- physical_plan::RecordBatchStream,
-};
-use log::info;
-use std::fmt::Formatter;
/// UnresolvedShuffleExec represents a dependency on the results of a ShuffleWriterExec node which hasn't computed yet.
///
diff --git a/ballista/rust/core/src/lib.rs b/ballista/rust/core/src/lib.rs
index bc7be4f..8329f63 100644
--- a/ballista/rust/core/src/lib.rs
+++ b/ballista/rust/core/src/lib.rs
@@ -16,7 +16,6 @@
// under the License.
#![doc = include_str!("../README.md")]
-#![allow(unused_imports)]
pub const BALLISTA_VERSION: &str = env!("CARGO_PKG_VERSION");
pub fn print_version() {
diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
index d4a28cf..068f1ef 100644
--- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
@@ -17,38 +17,26 @@
//! Serde code to convert from protocol buffers to Rust data structures.
+use crate::convert_required;
use crate::error::BallistaError;
-use crate::serde::{
- from_proto_binary_op, proto_error, protobuf, str_to_byte, vec_to_array,
-};
-use crate::{convert_box_required, convert_required};
-use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
-use datafusion::datasource::file_format::avro::AvroFormat;
-use datafusion::datasource::file_format::csv::CsvFormat;
-use datafusion::datasource::file_format::parquet::ParquetFormat;
-use datafusion::datasource::file_format::FileFormat;
-use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig};
-use datafusion::datasource::object_store::local::LocalFileSystem;
-use datafusion::datasource::object_store::{FileMeta, SizedFile};
+use crate::serde::{from_proto_binary_op, proto_error, protobuf, vec_to_array};
+use datafusion::arrow::datatypes::{DataType, Field, Schema};
+
use datafusion::logical_plan::window_frames::{
WindowFrame, WindowFrameBound, WindowFrameUnits,
};
use datafusion::logical_plan::{
- abs, acos, asin, atan, ceil, cos, digest, exp, floor, ln, log10, log2, round, signum,
- sin, sqrt, tan, trunc, Column, CreateExternalTable, DFField, DFSchema, Expr,
- JoinConstraint, JoinType, LogicalPlan, LogicalPlanBuilder, Operator,
+ abs, atan, ceil, cos, digest, exp, floor, ln, log10, log2, round, signum, sin, sqrt,
+ tan, trunc, Column, DFField, DFSchema, Expr,
};
use datafusion::physical_plan::aggregates::AggregateFunction;
use datafusion::physical_plan::window_functions::BuiltInWindowFunction;
use datafusion::prelude::*;
use datafusion::scalar::ScalarValue;
-use protobuf::listing_table_scan_node::FileFormatType;
-use protobuf::logical_plan_node::LogicalPlanType;
-use protobuf::{logical_expr_node::ExprType, scalar_type};
+
use std::{
convert::{From, TryInto},
sync::Arc,
- unimplemented,
};
impl From<&protobuf::Column> for Column {
@@ -292,7 +280,6 @@ fn typechecked_scalar_value_conversion(
impl TryInto<datafusion::scalar::ScalarValue> for &protobuf::scalar_value::Value {
type Error = BallistaError;
fn try_into(self) -> Result<datafusion::scalar::ScalarValue, Self::Error> {
- use datafusion::scalar::ScalarValue;
use protobuf::PrimitiveScalarType;
let scalar = match self {
protobuf::scalar_value::Value::BoolValue(v) => ScalarValue::Boolean(Some(*v)),
@@ -500,7 +487,6 @@ impl TryInto<DataType> for &protobuf::ScalarListType {
impl TryInto<datafusion::scalar::ScalarValue> for protobuf::PrimitiveScalarType {
type Error = BallistaError;
fn try_into(self) -> Result<datafusion::scalar::ScalarValue, Self::Error> {
- use datafusion::scalar::ScalarValue;
Ok(match self {
protobuf::PrimitiveScalarType::Null => {
return Err(proto_error("Untyped null is an invalid scalar value"))
@@ -657,7 +643,6 @@ impl TryInto<Expr> for &protobuf::LogicalExprNode {
use datafusion::physical_plan::window_functions;
use protobuf::logical_expr_node::ExprType;
use protobuf::window_expr_node;
- use protobuf::WindowExprNode;
let expr_type = self
.expr_type
@@ -671,7 +656,6 @@ impl TryInto<Expr> for &protobuf::LogicalExprNode {
}),
ExprType::Column(column) => Ok(Expr::Column(column.into())),
ExprType::Literal(literal) => {
- use datafusion::scalar::ScalarValue;
let scalar_value: datafusion::scalar::ScalarValue = literal.try_into()?;
Ok(Expr::Literal(scalar_value))
}
@@ -972,11 +956,9 @@ impl TryInto<Field> for &protobuf::Field {
}
}
-use crate::serde::protobuf::ColumnStats;
-use datafusion::physical_plan::{aggregates, windows};
use datafusion::prelude::{
- array, date_part, date_trunc, length, lower, ltrim, md5, rtrim, sha224, sha256,
- sha384, sha512, trim, upper,
+ date_part, date_trunc, lower, ltrim, rtrim, sha224, sha256, sha384, sha512, trim,
+ upper,
};
use std::convert::TryFrom;
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs
index 3166a48..f6e4627 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -28,24 +28,21 @@ use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig};
-use datafusion::datasource::object_store::local::LocalFileSystem;
-use datafusion::logical_plan::plan::Extension;
+
use datafusion::logical_plan::plan::{
Aggregate, EmptyRelation, Filter, Join, Projection, Sort, Window,
};
use datafusion::logical_plan::{
- exprlist_to_fields,
- window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits},
- Column, CreateExternalTable, CrossJoin, Expr, JoinConstraint, JoinType, Limit,
- LogicalPlan, LogicalPlanBuilder, Repartition, TableScan, Values,
+ Column, CreateExternalTable, CrossJoin, Expr, JoinConstraint, Limit, LogicalPlan,
+ LogicalPlanBuilder, Repartition, TableScan, Values,
};
use datafusion::prelude::ExecutionContext;
-use log::error;
-use prost::bytes::{Buf, BufMut};
+
+use prost::bytes::BufMut;
use prost::Message;
use protobuf::listing_table_scan_node::FileFormatType;
use protobuf::logical_plan_node::LogicalPlanType;
-use protobuf::{logical_expr_node::ExprType, scalar_type, LogicalPlanNode};
+use protobuf::LogicalPlanNode;
use std::convert::TryInto;
use std::sync::Arc;
@@ -855,22 +852,20 @@ mod roundtrip_tests {
use datafusion::datasource::object_store::{
FileMetaStream, ListEntryStream, ObjectReader, ObjectStore, SizedFile,
};
- use datafusion::datasource::TableProvider;
use datafusion::error::DataFusionError;
use datafusion::{
arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit, UnionMode},
datasource::object_store::local::LocalFileSystem,
logical_plan::{
- col, CreateExternalTable, Expr, LogicalPlan, LogicalPlanBuilder,
- Partitioning, Repartition, ToDFSchema,
+ col, CreateExternalTable, Expr, LogicalPlan, LogicalPlanBuilder, Repartition,
+ ToDFSchema,
},
physical_plan::{aggregates, functions::BuiltinScalarFunction::Sqrt},
prelude::*,
scalar::ScalarValue,
sql::parser::FileType,
};
- use protobuf::arrow_type;
- use sqlparser::test_utils::table;
+
use std::{convert::TryInto, sync::Arc};
#[derive(Debug)]
diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index 9cb8c41..99a5488 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -20,40 +20,23 @@
//! processes.
use super::super::proto_error;
-use crate::serde::{byte_to_string, protobuf, BallistaError};
+use crate::serde::{protobuf, BallistaError};
use datafusion::arrow::datatypes::{
DataType, Field, IntervalUnit, Schema, SchemaRef, TimeUnit, UnionMode,
};
-use datafusion::datasource::file_format::avro::AvroFormat;
-use datafusion::datasource::file_format::csv::CsvFormat;
-use datafusion::datasource::TableProvider;
-use datafusion::datasource::file_format::parquet::ParquetFormat;
-use datafusion::datasource::listing::ListingTable;
-use datafusion::logical_plan::plan::{
- Aggregate, EmptyRelation, Filter, Join, Projection, Sort, Window,
-};
use datafusion::logical_plan::{
- exprlist_to_fields,
window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits},
- Column, CreateExternalTable, CrossJoin, Expr, JoinConstraint, JoinType, Limit,
- LogicalPlan, Repartition, TableScan, Values,
+ Column, Expr,
};
use datafusion::physical_plan::aggregates::AggregateFunction;
use datafusion::physical_plan::functions::BuiltinScalarFunction;
use datafusion::physical_plan::window_functions::{
BuiltInWindowFunction, WindowFunction,
};
-use datafusion::physical_plan::{ColumnStatistics, Statistics};
-use protobuf::listing_table_scan_node::FileFormatType;
-use protobuf::{
- arrow_type, logical_expr_node::ExprType, scalar_type, DateUnit, PrimitiveScalarType,
- ScalarListValue, ScalarType,
-};
-use std::{
- boxed,
- convert::{TryFrom, TryInto},
-};
+
+use protobuf::{logical_expr_node::ExprType, scalar_type};
+use std::convert::{TryFrom, TryInto};
impl protobuf::IntervalUnit {
pub fn from_arrow_interval_unit(interval_unit: &IntervalUnit) -> Self {
@@ -179,7 +162,7 @@ impl TryInto<DataType> for &Box<protobuf::List> {
impl From<&DataType> for protobuf::arrow_type::ArrowTypeEnum {
fn from(val: &DataType) -> protobuf::arrow_type::ArrowTypeEnum {
use protobuf::arrow_type::ArrowTypeEnum;
- use protobuf::ArrowType;
+
use protobuf::EmptyMessage;
match val {
DataType::Null => ArrowTypeEnum::None(EmptyMessage {}),
@@ -301,7 +284,7 @@ fn is_valid_scalar_type_no_list_check(datatype: &DataType) -> bool {
impl TryFrom<&DataType> for protobuf::scalar_type::Datatype {
type Error = BallistaError;
fn try_from(val: &DataType) -> Result<Self, Self::Error> {
- use protobuf::{List, PrimitiveScalarType};
+ use protobuf::PrimitiveScalarType;
let scalar_value = match val {
DataType::Boolean => scalar_type::Datatype::Scalar(PrimitiveScalarType::Bool as i32),
DataType::Int8 => scalar_type::Datatype::Scalar(PrimitiveScalarType::Int8 as i32),
@@ -635,8 +618,6 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
type Error = BallistaError;
fn try_into(self) -> Result<protobuf::LogicalExprNode, Self::Error> {
- use datafusion::scalar::ScalarValue;
- use protobuf::scalar_value::Value;
match self {
Expr::Column(c) => {
let expr = protobuf::LogicalExprNode {
diff --git a/ballista/rust/core/src/serde/mod.rs b/ballista/rust/core/src/serde/mod.rs
index 47d69eb..25a8b28 100644
--- a/ballista/rust/core/src/serde/mod.rs
+++ b/ballista/rust/core/src/serde/mod.rs
@@ -18,16 +18,14 @@
//! This crate contains code generated from the Ballista Protocol Buffer Definition as well
//! as convenience code for interacting with the generated code.
-use prost::bytes::{Buf, BufMut};
+use prost::bytes::BufMut;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::Arc;
use std::{convert::TryInto, io::Cursor};
use datafusion::arrow::datatypes::{IntervalUnit, UnionMode};
-use datafusion::logical_plan::{
- JoinConstraint, JoinType, LogicalPlan, Operator, UserDefinedLogicalNode,
-};
+use datafusion::logical_plan::{JoinConstraint, JoinType, LogicalPlan, Operator};
use datafusion::physical_plan::aggregates::AggregateFunction;
use datafusion::physical_plan::window_functions::BuiltInWindowFunction;
@@ -600,7 +598,7 @@ mod tests {
use datafusion::prelude::{CsvReadOptions, ExecutionConfig, ExecutionContext};
use prost::Message;
use std::any::Any;
- use std::collections::BTreeMap;
+
use std::convert::TryInto;
use std::fmt;
use std::fmt::{Debug, Formatter};
@@ -608,7 +606,6 @@ mod tests {
pub mod proto {
use crate::serde::protobuf;
- use prost::Message;
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TopKPlanProto {
@@ -629,8 +626,7 @@ mod tests {
use crate::error::BallistaError;
use crate::serde::protobuf::{LogicalPlanNode, PhysicalPlanNode};
use crate::serde::{
- AsExecutionPlan, AsLogicalPlan, BallistaCodec, LogicalExtensionCodec,
- PhysicalExtensionCodec,
+ AsExecutionPlan, AsLogicalPlan, LogicalExtensionCodec, PhysicalExtensionCodec,
};
use proto::{TopKExecProto, TopKPlanProto};
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index 93629a3..9356e4b 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -17,24 +17,16 @@
//! Serde code to convert from protocol buffers to Rust data structures.
-use std::collections::HashMap;
use std::convert::{TryFrom, TryInto};
use std::sync::Arc;
use crate::error::BallistaError;
-use crate::execution_plans::{
- ShuffleReaderExec, ShuffleWriterExec, UnresolvedShuffleExec,
-};
-use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
-use crate::serde::protobuf::ShuffleReaderPartition;
-use crate::serde::scheduler::PartitionLocation;
-use crate::serde::{from_proto_binary_op, proto_error, protobuf, str_to_byte};
-use crate::{convert_box_required, convert_required, into_required};
+
+use crate::serde::{from_proto_binary_op, proto_error, protobuf};
+use crate::{convert_box_required, convert_required};
use chrono::{TimeZone, Utc};
-use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef};
-use datafusion::catalog::catalog::{
- CatalogList, CatalogProvider, MemoryCatalogList, MemoryCatalogProvider,
-};
+
+use datafusion::catalog::catalog::{CatalogList, MemoryCatalogList};
use datafusion::datasource::object_store::local::LocalFileSystem;
use datafusion::datasource::object_store::{FileMeta, ObjectStoreRegistry, SizedFile};
use datafusion::datasource::PartitionedFile;
@@ -42,47 +34,22 @@ use datafusion::execution::context::{
ExecutionConfig, ExecutionContextState, ExecutionProps,
};
use datafusion::execution::runtime_env::RuntimeEnv;
-use datafusion::logical_plan::{
- window_frames::WindowFrame, DFSchema, Expr, JoinConstraint, JoinType,
-};
-use datafusion::physical_plan::aggregates::{create_aggregate_expr, AggregateFunction};
-use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
-use datafusion::physical_plan::file_format::{
- AvroExec, CsvExec, FileScanConfig, ParquetExec,
-};
-use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
-use datafusion::physical_plan::hash_join::PartitionMode;
-use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
-use datafusion::physical_plan::planner::DefaultPhysicalPlanner;
-use datafusion::physical_plan::sorts::sort::{SortExec, SortOptions};
-use datafusion::physical_plan::window_functions::{
- BuiltInWindowFunction, WindowFunction,
-};
-use datafusion::physical_plan::windows::{create_window_expr, WindowAggExec};
+
+use datafusion::physical_plan::file_format::FileScanConfig;
+
+use datafusion::physical_plan::window_functions::WindowFunction;
+
use datafusion::physical_plan::{
- coalesce_batches::CoalesceBatchesExec,
- cross_join::CrossJoinExec,
- empty::EmptyExec,
expressions::{
- col, Avg, BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr,
- IsNullExpr, Literal, NegativeExpr, NotExpr, PhysicalSortExpr, TryCastExpr,
- DEFAULT_DATAFUSION_CAST_OPTIONS,
+ BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr,
+ Literal, NegativeExpr, NotExpr, TryCastExpr, DEFAULT_DATAFUSION_CAST_OPTIONS,
},
- filter::FilterExec,
functions::{self, BuiltinScalarFunction, ScalarFunctionExpr},
- hash_join::HashJoinExec,
- limit::{GlobalLimitExec, LocalLimitExec},
- projection::ProjectionExec,
- repartition::RepartitionExec,
Partitioning,
};
-use datafusion::physical_plan::{
- AggregateExpr, ColumnStatistics, ExecutionPlan, PhysicalExpr, Statistics, WindowExpr,
-};
-use datafusion::prelude::CsvReadOptions;
-use log::debug;
+use datafusion::physical_plan::{ColumnStatistics, PhysicalExpr, Statistics};
+
use protobuf::physical_expr_node::ExprType;
-use protobuf::physical_plan_node::PhysicalPlanType;
impl From<&protobuf::PhysicalColumn> for Column {
fn from(c: &protobuf::PhysicalColumn) -> Column {
diff --git a/ballista/rust/core/src/serde/physical_plan/mod.rs b/ballista/rust/core/src/serde/physical_plan/mod.rs
index 50aa317..5ef6992 100644
--- a/ballista/rust/core/src/serde/physical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/physical_plan/mod.rs
@@ -23,7 +23,7 @@ use crate::serde::physical_plan::from_proto::parse_protobuf_hash_partitioning;
use crate::serde::protobuf::physical_expr_node::ExprType;
use crate::serde::protobuf::physical_plan_node::PhysicalPlanType;
use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
-use crate::serde::protobuf::ShuffleReaderPartition;
+
use crate::serde::protobuf::{PhysicalExtensionNode, PhysicalPlanNode};
use crate::serde::scheduler::PartitionLocation;
use crate::serde::{
@@ -926,7 +926,7 @@ macro_rules! into_physical_plan {
#[cfg(test)]
mod roundtrip_tests {
- use std::{convert::TryInto, sync::Arc};
+ use std::sync::Arc;
use crate::serde::{AsExecutionPlan, BallistaCodec};
use datafusion::physical_plan::sorts::sort::SortExec;
@@ -945,8 +945,7 @@ mod roundtrip_tests {
hash_aggregate::{AggregateMode, HashAggregateExec},
hash_join::{HashJoinExec, PartitionMode},
limit::{GlobalLimitExec, LocalLimitExec},
- AggregateExpr, ColumnarValue, Distribution, ExecutionPlan, Partitioning,
- PhysicalExpr,
+ AggregateExpr, ExecutionPlan, Partitioning, PhysicalExpr,
},
scalar::ScalarValue,
};
diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
index e6f4b59..e3b60e0 100644
--- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
@@ -26,54 +26,26 @@ use std::{
sync::Arc,
};
-use datafusion::physical_plan::hash_join::{HashJoinExec, PartitionMode};
-use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
-use datafusion::physical_plan::projection::ProjectionExec;
-use datafusion::physical_plan::sorts::sort::SortExec;
-use datafusion::physical_plan::{cross_join::CrossJoinExec, ColumnStatistics};
+use datafusion::physical_plan::expressions::{CastExpr, TryCastExpr};
+use datafusion::physical_plan::ColumnStatistics;
use datafusion::physical_plan::{
expressions::{
CaseExpr, InListExpr, IsNotNullExpr, IsNullExpr, NegativeExpr, NotExpr,
},
Statistics,
};
-use datafusion::physical_plan::{
- expressions::{CastExpr, TryCastExpr},
- file_format::ParquetExec,
-};
-use datafusion::physical_plan::{file_format::AvroExec, filter::FilterExec};
-use datafusion::physical_plan::{
- file_format::FileScanConfig, hash_aggregate::AggregateMode,
-};
-use datafusion::{
- datasource::PartitionedFile, physical_plan::coalesce_batches::CoalesceBatchesExec,
-};
-use datafusion::{logical_plan::JoinType, physical_plan::file_format::CsvExec};
-use datafusion::{
- physical_plan::expressions::{Count, Literal},
- scalar::ScalarValue,
-};
-use datafusion::physical_plan::{
- empty::EmptyExec,
- expressions::{Avg, BinaryExpr, Column, Max, Min, Sum},
- Partitioning,
-};
-use datafusion::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr};
+use datafusion::datasource::PartitionedFile;
+use datafusion::physical_plan::file_format::FileScanConfig;
+
+use datafusion::physical_plan::expressions::{Count, Literal};
-use datafusion::physical_plan::hash_aggregate::HashAggregateExec;
-use protobuf::physical_plan_node::PhysicalPlanType;
+use datafusion::physical_plan::expressions::{Avg, BinaryExpr, Column, Max, Min, Sum};
+use datafusion::physical_plan::{AggregateExpr, PhysicalExpr};
-use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
-use crate::serde::scheduler::PartitionLocation;
use crate::serde::{protobuf, BallistaError};
-use crate::{
- execution_plans::{ShuffleReaderExec, ShuffleWriterExec, UnresolvedShuffleExec},
- serde::byte_to_string,
-};
-use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+
use datafusion::physical_plan::functions::{BuiltinScalarFunction, ScalarFunctionExpr};
-use datafusion::physical_plan::repartition::RepartitionExec;
impl TryInto<protobuf::PhysicalExprNode> for Arc<dyn AggregateExpr> {
type Error = BallistaError;
diff --git a/ballista/rust/core/src/serde/scheduler/from_proto.rs b/ballista/rust/core/src/serde/scheduler/from_proto.rs
index 8d4e279..b401f1f 100644
--- a/ballista/rust/core/src/serde/scheduler/from_proto.rs
+++ b/ballista/rust/core/src/serde/scheduler/from_proto.rs
@@ -15,17 +15,12 @@
// specific language governing permissions and limitations
// under the License.
-use std::{collections::HashMap, convert::TryInto};
+use std::convert::TryInto;
use crate::error::BallistaError;
use crate::serde::protobuf;
use crate::serde::protobuf::action::ActionType;
-use crate::serde::scheduler::{
- Action, ExecutePartition, PartitionId, PartitionLocation, PartitionStats,
-};
-
-use datafusion::logical_plan::LogicalPlan;
-use uuid::Uuid;
+use crate::serde::scheduler::{Action, PartitionId, PartitionLocation, PartitionStats};
impl TryInto<Action> for protobuf::Action {
type Error = BallistaError;
diff --git a/ballista/rust/core/src/serde/scheduler/mod.rs b/ballista/rust/core/src/serde/scheduler/mod.rs
index a03811c..c304382 100644
--- a/ballista/rust/core/src/serde/scheduler/mod.rs
+++ b/ballista/rust/core/src/serde/scheduler/mod.rs
@@ -18,14 +18,13 @@
use std::{collections::HashMap, fmt, sync::Arc};
use datafusion::arrow::array::{
- ArrayBuilder, ArrayRef, StructArray, StructBuilder, UInt64Array, UInt64Builder,
+ ArrayBuilder, StructArray, StructBuilder, UInt64Array, UInt64Builder,
};
-use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
-use datafusion::logical_plan::LogicalPlan;
+use datafusion::arrow::datatypes::{DataType, Field};
+
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::Partitioning;
use serde::Serialize;
-use uuid::Uuid;
use super::protobuf;
use crate::error::BallistaError;
diff --git a/ballista/rust/core/src/serde/scheduler/to_proto.rs b/ballista/rust/core/src/serde/scheduler/to_proto.rs
index 71a02d3..4c1c5d1 100644
--- a/ballista/rust/core/src/serde/scheduler/to_proto.rs
+++ b/ballista/rust/core/src/serde/scheduler/to_proto.rs
@@ -20,9 +20,7 @@ use std::convert::TryInto;
use crate::error::BallistaError;
use crate::serde::protobuf;
use crate::serde::protobuf::action::ActionType;
-use crate::serde::scheduler::{
- Action, ExecutePartition, PartitionId, PartitionLocation, PartitionStats,
-};
+use crate::serde::scheduler::{Action, PartitionId, PartitionLocation, PartitionStats};
use datafusion::physical_plan::Partitioning;
impl TryInto<protobuf::Action> for Action {
diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index fae3826..b612ddb 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -15,10 +15,9 @@
// specific language governing permissions and limitations
// under the License.
-use std::collections::HashMap;
use std::io::{BufWriter, Write};
use std::marker::PhantomData;
-use std::ops::Deref;
+
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::{fs::File, pin::Pin};
@@ -35,38 +34,27 @@ use async_trait::async_trait;
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::error::Result as ArrowResult;
use datafusion::arrow::{
- array::{
- ArrayBuilder, ArrayRef, StructArray, StructBuilder, UInt64Array, UInt64Builder,
- },
- datatypes::{DataType, Field, SchemaRef},
- ipc::reader::FileReader,
- ipc::writer::FileWriter,
- record_batch::RecordBatch,
+ datatypes::SchemaRef, ipc::writer::FileWriter, record_batch::RecordBatch,
};
use datafusion::error::DataFusionError;
use datafusion::execution::context::{
ExecutionConfig, ExecutionContext, ExecutionContextState, QueryPlanner,
};
-use datafusion::logical_plan::{LogicalPlan, Operator, TableScan};
-use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches;
-use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
-use datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule;
+use datafusion::logical_plan::LogicalPlan;
+
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::common::batch_byte_size;
use datafusion::physical_plan::empty::EmptyExec;
-use datafusion::physical_plan::expressions::{BinaryExpr, Column, Literal};
+
use datafusion::physical_plan::file_format::{CsvExec, ParquetExec};
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::hash_aggregate::HashAggregateExec;
use datafusion::physical_plan::hash_join::HashJoinExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::sorts::sort::SortExec;
-use datafusion::physical_plan::{
- metrics, AggregateExpr, ExecutionPlan, Metric, PhysicalExpr, RecordBatchStream,
-};
-use futures::{future, Stream, StreamExt};
-use std::time::Instant;
+use datafusion::physical_plan::{metrics, ExecutionPlan, RecordBatchStream};
+use futures::{Stream, StreamExt};
/// Stream data to disk in Arrow IPC format