You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2020/12/13 12:31:28 UTC
[arrow] branch master updated: ARROW-10893: [Rust] [DataFusion]
More clippy lints
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 989757f ARROW-10893: [Rust] [DataFusion] More clippy lints
989757f is described below
commit 989757f68764830956ba11bf8d9fa7e19c114307
Author: Neville Dipale <ne...@gmail.com>
AuthorDate: Sun Dec 13 07:30:02 2020 -0500
ARROW-10893: [Rust] [DataFusion] More clippy lints
This addresses more clippy lints, especially the ones that `cargo clippy --fix -Z unstable-options --allow-dirty` can address
Closes #8905 from nevi-me/ARROW-10893
Authored-by: Neville Dipale <ne...@gmail.com>
Signed-off-by: Andrew Lamb <an...@nerdnetworks.org>
---
rust/datafusion/src/datasource/memory.rs | 7 +--
rust/datafusion/src/execution/context.rs | 21 +++----
rust/datafusion/src/lib.rs | 18 +-----
rust/datafusion/src/logical_plan/builder.rs | 4 +-
rust/datafusion/src/logical_plan/dfschema.rs | 14 ++---
rust/datafusion/src/optimizer/filter_push_down.rs | 16 ++---
.../src/optimizer/projection_push_down.rs | 42 ++++++-------
rust/datafusion/src/optimizer/utils.rs | 8 +--
.../src/physical_plan/array_expressions.rs | 2 +-
rust/datafusion/src/physical_plan/common.rs | 6 +-
rust/datafusion/src/physical_plan/csv.rs | 7 +--
.../src/physical_plan/datetime_expressions.rs | 7 +--
.../src/physical_plan/distinct_expressions.rs | 28 ++++-----
rust/datafusion/src/physical_plan/expressions.rs | 23 ++++---
rust/datafusion/src/physical_plan/functions.rs | 2 +-
.../datafusion/src/physical_plan/hash_aggregate.rs | 25 ++++----
rust/datafusion/src/physical_plan/hash_utils.rs | 6 +-
rust/datafusion/src/physical_plan/limit.rs | 4 +-
rust/datafusion/src/physical_plan/mod.rs | 36 +++++------
rust/datafusion/src/physical_plan/planner.rs | 6 +-
.../src/physical_plan/string_expressions.rs | 2 +-
rust/datafusion/src/physical_plan/type_coercion.rs | 71 ++++++++--------------
rust/datafusion/src/scalar.rs | 35 +++++------
rust/datafusion/src/sql/parser.rs | 3 +-
rust/datafusion/src/sql/planner.rs | 6 +-
rust/datafusion/src/test/mod.rs | 5 +-
26 files changed, 166 insertions(+), 238 deletions(-)
diff --git a/rust/datafusion/src/datasource/memory.rs b/rust/datafusion/src/datasource/memory.rs
index f00a381..e34e2d1 100644
--- a/rust/datafusion/src/datasource/memory.rs
+++ b/rust/datafusion/src/datasource/memory.rs
@@ -241,7 +241,7 @@ mod tests {
Err(DataFusionError::Internal(e)) => {
assert_eq!("\"Projection index out of range\"", format!("{:?}", e))
}
- _ => assert!(false, "Scan should failed on invalid projection"),
+ _ => panic!("Scan should failed on invalid projection"),
};
Ok(())
@@ -275,10 +275,7 @@ mod tests {
"\"Mismatch between schema and batches\"",
format!("{:?}", e)
),
- _ => assert!(
- false,
- "MemTable::new should have failed due to schema mismatch"
- ),
+ _ => panic!("MemTable::new should have failed due to schema mismatch"),
}
Ok(())
diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs
index 2ecccb7..d74c7db 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -106,7 +106,7 @@ impl ExecutionContext {
/// Create a new execution context using the provided configuration
pub fn with_config(config: ExecutionConfig) -> Self {
- let ctx = Self {
+ Self {
state: ExecutionContextState {
datasources: HashMap::new(),
scalar_functions: HashMap::new(),
@@ -114,8 +114,7 @@ impl ExecutionContext {
aggregate_functions: HashMap::new(),
config,
},
- };
- ctx
+ }
}
/// Get the configuration of this execution context
@@ -514,15 +513,11 @@ impl SchemaProvider for ExecutionContextState {
}
fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
- self.scalar_functions
- .get(name)
- .and_then(|func| Some(func.clone()))
+ self.scalar_functions.get(name).cloned()
}
fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>> {
- self.aggregate_functions
- .get(name)
- .and_then(|func| Some(func.clone()))
+ self.aggregate_functions.get(name).cloned()
}
}
@@ -676,9 +671,9 @@ mod tests {
assert_eq!(table_schema.fields().len(), 2);
assert_eq!(projected_schema.fields().len(), 1);
}
- _ => assert!(false, "input to projection should be TableScan"),
+ _ => panic!("input to projection should be TableScan"),
},
- _ => assert!(false, "expect optimized_plan to be projection"),
+ _ => panic!("expect optimized_plan to be projection"),
}
let expected = "Projection: #c2\
@@ -754,9 +749,9 @@ mod tests {
assert_eq!(table_schema.fields().len(), 3);
assert_eq!(projected_schema.fields().len(), 1);
}
- _ => assert!(false, "input to projection should be InMemoryScan"),
+ _ => panic!("input to projection should be InMemoryScan"),
},
- _ => assert!(false, "expect optimized_plan to be projection"),
+ _ => panic!("expect optimized_plan to be projection"),
}
let expected = "Projection: #b\
diff --git a/rust/datafusion/src/lib.rs b/rust/datafusion/src/lib.rs
index 586250a..7f01a6f 100644
--- a/rust/datafusion/src/lib.rs
+++ b/rust/datafusion/src/lib.rs
@@ -17,30 +17,14 @@
#![warn(missing_docs)]
// Clippy lints, some should be disabled incrementally
#![allow(
- clippy::assertions_on_constants,
- clippy::bind_instead_of_map,
- clippy::blocks_in_if_conditions,
- clippy::collapsible_if,
- clippy::explicit_counter_loop,
clippy::field_reassign_with_default,
clippy::float_cmp,
- clippy::into_iter_on_ref,
- clippy::len_zero,
- clippy::let_and_return,
- clippy::map_collect_result_unit,
- clippy::match_like_matches_macro,
- clippy::match_ref_pats,
clippy::module_inception,
clippy::needless_lifetimes,
clippy::needless_range_loop,
- clippy::needless_return,
clippy::new_without_default,
clippy::ptr_arg,
- clippy::single_match,
- clippy::stable_sort_primitive,
- clippy::type_complexity,
- clippy::unit_arg,
- clippy::zero_prefixed_literal
+ clippy::type_complexity
)]
//! DataFusion is an extensible query execution framework that uses
diff --git a/rust/datafusion/src/logical_plan/builder.rs b/rust/datafusion/src/logical_plan/builder.rs
index 942de72..dc75814 100644
--- a/rust/datafusion/src/logical_plan/builder.rs
+++ b/rust/datafusion/src/logical_plan/builder.rs
@@ -344,7 +344,7 @@ fn validate_unique_names(
input_schema: &DFSchema,
) -> Result<()> {
let mut unique_names = HashMap::new();
- expressions.iter().enumerate().map(|(position, expr)| {
+ expressions.iter().enumerate().try_for_each(|(position, expr)| {
let name = expr.name(input_schema)?;
match unique_names.get(&name) {
None => {
@@ -361,7 +361,7 @@ fn validate_unique_names(
))
}
}
- }).collect::<Result<()>>()
+ })
}
#[cfg(test)]
diff --git a/rust/datafusion/src/logical_plan/dfschema.rs b/rust/datafusion/src/logical_plan/dfschema.rs
index 5d72df8..b6c1d21 100644
--- a/rust/datafusion/src/logical_plan/dfschema.rs
+++ b/rust/datafusion/src/logical_plan/dfschema.rs
@@ -55,13 +55,11 @@ impl DFSchema {
field.qualified_name()
)));
}
- } else {
- if !unqualified_names.insert(field.name()) {
- return Err(DataFusionError::Plan(format!(
- "Schema contains duplicate unqualified field name '{}'",
- field.name()
- )));
- }
+ } else if !unqualified_names.insert(field.name()) {
+ return Err(DataFusionError::Plan(format!(
+ "Schema contains duplicate unqualified field name '{}'",
+ field.name()
+ )));
}
}
@@ -217,7 +215,7 @@ impl TryFrom<Schema> for DFSchema {
Self::new(
schema
.fields()
- .into_iter()
+ .iter()
.map(|f| DFField {
field: f.clone(),
qualifier: None,
diff --git a/rust/datafusion/src/optimizer/filter_push_down.rs b/rust/datafusion/src/optimizer/filter_push_down.rs
index f095f8d..6a1e6d2 100644
--- a/rust/datafusion/src/optimizer/filter_push_down.rs
+++ b/rust/datafusion/src/optimizer/filter_push_down.rs
@@ -68,11 +68,10 @@ fn get_predicates<'a>(
.filters
.iter()
.filter(|(_, columns)| {
- columns
+ !columns
.intersection(used_columns)
.collect::<HashSet<_>>()
- .len()
- > 0
+ .is_empty()
})
.map(|&(ref a, ref b)| (a, b))
.unzip()
@@ -330,7 +329,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
impl OptimizerRule for FilterPushDown {
fn name(&self) -> &str {
- return "filter_push_down";
+ "filter_push_down"
}
fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
@@ -354,13 +353,10 @@ fn rewrite(expr: &Expr, projection: &HashMap<String, Expr>) -> Result<Expr> {
.map(|e| rewrite(e, &projection))
.collect::<Result<Vec<_>>>()?;
- match expr {
- Expr::Column(name) => {
- if let Some(expr) = projection.get(name) {
- return Ok(expr.clone());
- }
+ if let Expr::Column(name) = expr {
+ if let Some(expr) = projection.get(name) {
+ return Ok(expr.clone());
}
- _ => {}
}
utils::rewrite_expression(&expr, &expressions)
diff --git a/rust/datafusion/src/optimizer/projection_push_down.rs b/rust/datafusion/src/optimizer/projection_push_down.rs
index 72b14bc..c1240d9 100644
--- a/rust/datafusion/src/optimizer/projection_push_down.rs
+++ b/rust/datafusion/src/optimizer/projection_push_down.rs
@@ -40,11 +40,11 @@ impl OptimizerRule for ProjectionPushDown {
.iter()
.map(|f| f.name().clone())
.collect::<HashSet<String>>();
- return optimize_plan(self, plan, &required_columns, false);
+ optimize_plan(self, plan, &required_columns, false)
}
fn name(&self) -> &str {
- return "projection_push_down";
+ "projection_push_down"
}
}
@@ -95,7 +95,7 @@ fn get_projected_schema(
}
// sort the projection otherwise we get non-deterministic behavior
- projection.sort();
+ projection.sort_unstable();
// create the projected schema
let mut projected_fields: Vec<DFField> = Vec::with_capacity(projection.len());
@@ -132,7 +132,7 @@ fn optimize_plan(
.fields()
.iter()
.enumerate()
- .map(|(i, field)| {
+ .try_for_each(|(i, field)| {
if required_columns.contains(field.name()) {
new_expr.push(expr[i].clone());
new_fields.push(field.clone());
@@ -142,12 +142,11 @@ fn optimize_plan(
} else {
Ok(())
}
- })
- .collect::<Result<()>>()?;
+ })?;
let new_input =
optimize_plan(optimizer, &input, &new_required_columns, true)?;
- if new_fields.len() == 0 {
+ if new_fields.is_empty() {
// no need for an expression at all
Ok(new_input)
} else {
@@ -203,22 +202,19 @@ fn optimize_plan(
// Gather all columns needed for expressions in this Aggregate
let mut new_aggr_expr = Vec::new();
- aggr_expr
- .iter()
- .map(|expr| {
- let name = &expr.name(&schema)?;
-
- if required_columns.contains(name) {
- new_aggr_expr.push(expr.clone());
- new_required_columns.insert(name.clone());
-
- // add to the new set of required columns
- utils::expr_to_column_names(expr, &mut new_required_columns)
- } else {
- Ok(())
- }
- })
- .collect::<Result<()>>()?;
+ aggr_expr.iter().try_for_each(|expr| {
+ let name = &expr.name(&schema)?;
+
+ if required_columns.contains(name) {
+ new_aggr_expr.push(expr.clone());
+ new_required_columns.insert(name.clone());
+
+ // add to the new set of required columns
+ utils::expr_to_column_names(expr, &mut new_required_columns)
+ } else {
+ Ok(())
+ }
+ })?;
let new_schema = DFSchema::new(
schema
diff --git a/rust/datafusion/src/optimizer/utils.rs b/rust/datafusion/src/optimizer/utils.rs
index 7a7d89c..f482a17 100644
--- a/rust/datafusion/src/optimizer/utils.rs
+++ b/rust/datafusion/src/optimizer/utils.rs
@@ -438,7 +438,7 @@ mod tests {
}
fn name(&self) -> &str {
- return "test_optimizer";
+ "test_optimizer"
}
}
@@ -476,11 +476,7 @@ mod tests {
];
assert_eq!(*stringified_plans, expected_stringified_plans);
}
- _ => assert!(
- false,
- "Expected explain plan but got {:?}",
- optimized_explain
- ),
+ _ => panic!("Expected explain plan but got {:?}", optimized_explain),
}
Ok(())
diff --git a/rust/datafusion/src/physical_plan/array_expressions.rs b/rust/datafusion/src/physical_plan/array_expressions.rs
index 7bdb742..9af81ad 100644
--- a/rust/datafusion/src/physical_plan/array_expressions.rs
+++ b/rust/datafusion/src/physical_plan/array_expressions.rs
@@ -61,7 +61,7 @@ macro_rules! array {
/// put values in an array.
pub fn array(args: &[ArrayRef]) -> Result<ArrayRef> {
// do not accept 0 arguments.
- if args.len() == 0 {
+ if args.is_empty() {
return Err(DataFusionError::Internal(
"array requires at least one argument".to_string(),
));
diff --git a/rust/datafusion/src/physical_plan/common.rs b/rust/datafusion/src/physical_plan/common.rs
index 3c63a44..2b96c45 100644
--- a/rust/datafusion/src/physical_plan/common.rs
+++ b/rust/datafusion/src/physical_plan/common.rs
@@ -101,10 +101,8 @@ pub fn build_file_list(dir: &str, filenames: &mut Vec<String>, ext: &str) -> Res
if let Some(path_name) = path.to_str() {
if path.is_dir() {
build_file_list(path_name, filenames, ext)?;
- } else {
- if path_name.ends_with(ext) {
- filenames.push(path_name.to_string());
- }
+ } else if path_name.ends_with(ext) {
+ filenames.push(path_name.to_string());
}
} else {
return Err(DataFusionError::Plan("Invalid path".to_string()));
diff --git a/rust/datafusion/src/physical_plan/csv.rs b/rust/datafusion/src/physical_plan/csv.rs
index 857e874..54adb38 100644
--- a/rust/datafusion/src/physical_plan/csv.rs
+++ b/rust/datafusion/src/physical_plan/csv.rs
@@ -87,11 +87,8 @@ impl<'a> CsvReadOptions<'a> {
/// Configure delimiter setting with Option, None value will be ignored
pub fn delimiter_option(mut self, delimiter: Option<u8>) -> Self {
- match delimiter {
- Some(d) => {
- self.delimiter = d;
- }
- _ => (),
+ if let Some(d) = delimiter {
+ self.delimiter = d;
}
self
}
diff --git a/rust/datafusion/src/physical_plan/datetime_expressions.rs b/rust/datafusion/src/physical_plan/datetime_expressions.rs
index de86f5f..a12b00c 100644
--- a/rust/datafusion/src/physical_plan/datetime_expressions.rs
+++ b/rust/datafusion/src/physical_plan/datetime_expressions.rs
@@ -280,7 +280,7 @@ mod tests {
// timezone the test machine is running. Thus it is still
// somewhat suceptable to bugs in the use of chrono
let naive_datetime = NaiveDateTime::new(
- NaiveDate::from_ymd(2020, 09, 08),
+ NaiveDate::from_ymd(2020, 9, 8),
NaiveTime::from_hms_nano(13, 42, 29, 190855),
);
@@ -298,7 +298,7 @@ mod tests {
// Also ensure that parsing timestamps with no fractional
// second part works as well
let naive_datetime_whole_secs = NaiveDateTime::new(
- NaiveDate::from_ymd(2020, 09, 08),
+ NaiveDate::from_ymd(2020, 9, 8),
NaiveTime::from_hms(13, 42, 29),
);
@@ -342,8 +342,7 @@ mod tests {
fn expect_timestamp_parse_error(s: &str, expected_err: &str) {
match string_to_timestamp_nanos(s) {
- Ok(v) => assert!(
- false,
+ Ok(v) => panic!(
"Expected error '{}' while parsing '{}', but parsed {} instead",
expected_err, s, v
),
diff --git a/rust/datafusion/src/physical_plan/distinct_expressions.rs b/rust/datafusion/src/physical_plan/distinct_expressions.rs
index bbccc3b..9aa639a 100644
--- a/rust/datafusion/src/physical_plan/distinct_expressions.rs
+++ b/rust/datafusion/src/physical_plan/distinct_expressions.rs
@@ -124,7 +124,7 @@ impl Accumulator for DistinctCountAccumulator {
}
fn merge(&mut self, states: &Vec<ScalarValue>) -> Result<()> {
- if states.len() == 0 {
+ if states.is_empty() {
return Ok(());
}
@@ -138,15 +138,13 @@ impl Accumulator for DistinctCountAccumulator {
})
.collect::<Result<Vec<_>>>()?;
- (0..col_values[0].len())
- .map(|row_index| {
- let row_values = col_values
- .iter()
- .map(|col| col[row_index].clone())
- .collect::<Vec<_>>();
- self.update(&row_values)
- })
- .collect::<Result<_>>()
+ (0..col_values[0].len()).try_for_each(|row_index| {
+ let row_values = col_values
+ .iter()
+ .map(|col| col[row_index].clone())
+ .collect::<Vec<_>>();
+ self.update(&row_values)
+ })
}
fn state(&self) -> Result<Vec<ScalarValue>> {
@@ -178,12 +176,10 @@ impl Accumulator for DistinctCountAccumulator {
fn evaluate(&self) -> Result<ScalarValue> {
match &self.count_data_type {
DataType::UInt64 => Ok(ScalarValue::UInt64(Some(self.values.len() as u64))),
- t => {
- return Err(DataFusionError::Internal(format!(
- "Invalid data type {:?} for count distinct aggregation",
- t
- )))
- }
+ t => Err(DataFusionError::Internal(format!(
+ "Invalid data type {:?} for count distinct aggregation",
+ t
+ ))),
}
}
}
diff --git a/rust/datafusion/src/physical_plan/expressions.rs b/rust/datafusion/src/physical_plan/expressions.rs
index c294eed..79045d9 100644
--- a/rust/datafusion/src/physical_plan/expressions.rs
+++ b/rust/datafusion/src/physical_plan/expressions.rs
@@ -1638,7 +1638,7 @@ impl fmt::Display for NotExpr {
impl PhysicalExpr for NotExpr {
fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
- return Ok(DataType::Boolean);
+ Ok(DataType::Boolean)
}
fn nullable(&self, input_schema: &Schema) -> Result<bool> {
@@ -1791,7 +1791,7 @@ impl fmt::Display for IsNullExpr {
}
impl PhysicalExpr for IsNullExpr {
fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
- return Ok(DataType::Boolean);
+ Ok(DataType::Boolean)
}
fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
@@ -1836,7 +1836,7 @@ impl fmt::Display for IsNotNullExpr {
}
impl PhysicalExpr for IsNotNullExpr {
fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
- return Ok(DataType::Boolean);
+ Ok(DataType::Boolean)
}
fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
@@ -1911,7 +1911,7 @@ impl CaseExpr {
when_then_expr: &[(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)],
else_expr: Option<Arc<dyn PhysicalExpr>>,
) -> Result<Self> {
- if when_then_expr.len() == 0 {
+ if when_then_expr.is_empty() {
Err(DataFusionError::Execution(
"There must be at least one WHEN clause".to_string(),
))
@@ -2280,11 +2280,16 @@ pub struct CastExpr {
/// Determine if a DataType is signed numeric or not
pub fn is_signed_numeric(dt: &DataType) -> bool {
- match dt {
- DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => true,
- DataType::Float16 | DataType::Float32 | DataType::Float64 => true,
- _ => false,
- }
+ matches!(
+ dt,
+ DataType::Int8
+ | DataType::Int16
+ | DataType::Int32
+ | DataType::Int64
+ | DataType::Float16
+ | DataType::Float32
+ | DataType::Float64
+ )
}
/// Determine if a DataType is numeric or not
diff --git a/rust/datafusion/src/physical_plan/functions.rs b/rust/datafusion/src/physical_plan/functions.rs
index d296512..fb6c63b 100644
--- a/rust/datafusion/src/physical_plan/functions.rs
+++ b/rust/datafusion/src/physical_plan/functions.rs
@@ -180,7 +180,7 @@ pub fn return_type(
// verify that this is a valid set of data types for this function
data_types(&arg_types, &signature(fun))?;
- if arg_types.len() == 0 {
+ if arg_types.is_empty() {
// functions currently cannot be evaluated without arguments, as they can't
// know the number of rows to return.
return Err(DataFusionError::Plan(format!(
diff --git a/rust/datafusion/src/physical_plan/hash_aggregate.rs b/rust/datafusion/src/physical_plan/hash_aggregate.rs
index 27deb94..87ded23 100644
--- a/rust/datafusion/src/physical_plan/hash_aggregate.rs
+++ b/rust/datafusion/src/physical_plan/hash_aggregate.rs
@@ -290,11 +290,10 @@ fn group_aggregate_batch(
// 2.5 clear indices
accumulators
.iter_mut()
- // 2.1
- .map(|(_, (_, accumulator_set, indices))| {
+ .try_for_each(|(_, (_, accumulator_set, indices))| {
// 2.2
accumulator_set
- .into_iter()
+ .iter_mut()
.zip(&aggr_input_values)
.map(|(accumulator, aggr_array)| {
(
@@ -313,19 +312,19 @@ fn group_aggregate_batch(
.collect::<Vec<ArrayRef>>(),
)
})
- // 2.4
- .map(|(accumulator, values)| match mode {
+ .try_for_each(|(accumulator, values)| match mode {
AggregateMode::Partial => accumulator.update_batch(&values),
AggregateMode::Final => {
// note: the aggregation here is over states, not values, thus the merge
accumulator.merge_batch(&values)
}
})
- .collect::<Result<()>>()
// 2.5
- .and(Ok(indices.clear()))
- })
- .collect::<Result<()>>()?;
+ .and({
+ indices.clear();
+ Ok(())
+ })
+ })?;
Ok(accumulators)
}
@@ -684,7 +683,7 @@ fn create_batch_from_map(
// 4.
.collect::<ArrowResult<Vec<Vec<ArrayRef>>>>()?;
- let batch = if arrays.len() != 0 {
+ let batch = if !arrays.is_empty() {
// 5.
let columns = concatenate(arrays)?;
RecordBatch::try_new(Arc::new(output_schema.to_owned()), columns)?
@@ -716,8 +715,8 @@ fn finalize_aggregation(
.iter()
.map(|accumulator| accumulator.state())
.map(|value| {
- value.and_then(|e| {
- Ok(e.iter().map(|v| v.to_array()).collect::<Vec<ArrayRef>>())
+ value.map(|e| {
+ e.iter().map(|v| v.to_array()).collect::<Vec<ArrayRef>>()
})
})
.collect::<Result<Vec<_>>>()?;
@@ -727,7 +726,7 @@ fn finalize_aggregation(
// merge the state to the final value
accumulators
.iter()
- .map(|accumulator| accumulator.evaluate().and_then(|v| Ok(v.to_array())))
+ .map(|accumulator| accumulator.evaluate().map(|v| v.to_array()))
.collect::<Result<Vec<ArrayRef>>>()
}
}
diff --git a/rust/datafusion/src/physical_plan/hash_utils.rs b/rust/datafusion/src/physical_plan/hash_utils.rs
index 03d37bd..5c13dfc 100644
--- a/rust/datafusion/src/physical_plan/hash_utils.rs
+++ b/rust/datafusion/src/physical_plan/hash_utils.rs
@@ -52,7 +52,7 @@ fn check_join_set_is_valid(
right: &HashSet<String>,
on: &JoinOn,
) -> Result<()> {
- if on.len() == 0 {
+ if on.is_empty() {
return Err(DataFusionError::Plan(
"The 'on' clause of a join cannot be empty".to_string(),
));
@@ -63,7 +63,7 @@ fn check_join_set_is_valid(
let on_right = &on.iter().map(|on| on.1.to_string()).collect::<HashSet<_>>();
let right_missing = on_right.difference(right).collect::<HashSet<_>>();
- if (left_missing.len() > 0) | (right_missing.len() > 0) {
+ if !left_missing.is_empty() | !right_missing.is_empty() {
return Err(DataFusionError::Plan(format!(
"The left or right side of the join does not have all columns on \"on\": \nMissing on the left: {:?}\nMissing on the right: {:?}",
left_missing,
@@ -78,7 +78,7 @@ fn check_join_set_is_valid(
let collisions = left.intersection(&remaining).collect::<HashSet<_>>();
- if collisions.len() > 0 {
+ if !collisions.is_empty() {
return Err(DataFusionError::Plan(format!(
"The left schema and the right schema have the following columns with the same name without being on the ON statement: {:?}. Consider aliasing them.",
collisions,
diff --git a/rust/datafusion/src/physical_plan/limit.rs b/rust/datafusion/src/physical_plan/limit.rs
index 78eb896..aebaf34 100644
--- a/rust/datafusion/src/physical_plan/limit.rs
+++ b/rust/datafusion/src/physical_plan/limit.rs
@@ -202,10 +202,10 @@ impl LimitStream {
fn stream_limit(&mut self, batch: RecordBatch) -> Option<RecordBatch> {
if self.current_len == self.limit {
- return None;
+ None
} else if self.current_len + batch.num_rows() <= self.limit {
self.current_len += batch.num_rows();
- return Some(batch);
+ Some(batch)
} else {
let batch_rows = self.limit - self.current_len;
self.current_len = self.limit;
diff --git a/rust/datafusion/src/physical_plan/mod.rs b/rust/datafusion/src/physical_plan/mod.rs
index 8a5dfd9..b7cddbc 100644
--- a/rust/datafusion/src/physical_plan/mod.rs
+++ b/rust/datafusion/src/physical_plan/mod.rs
@@ -185,18 +185,16 @@ pub trait Accumulator: Send + Sync + Debug {
/// updates the accumulator's state from a vector of arrays.
fn update_batch(&mut self, values: &Vec<ArrayRef>) -> Result<()> {
- if values.len() == 0 {
+ if values.is_empty() {
return Ok(());
};
- (0..values[0].len())
- .map(|index| {
- let v = values
- .iter()
- .map(|array| ScalarValue::try_from_array(array, index))
- .collect::<Result<Vec<_>>>()?;
- self.update(&v)
- })
- .collect::<Result<_>>()
+ (0..values[0].len()).try_for_each(|index| {
+ let v = values
+ .iter()
+ .map(|array| ScalarValue::try_from_array(array, index))
+ .collect::<Result<Vec<_>>>()?;
+ self.update(&v)
+ })
}
/// updates the accumulator's state from a vector of scalars.
@@ -204,18 +202,16 @@ pub trait Accumulator: Send + Sync + Debug {
/// updates the accumulator's state from a vector of states.
fn merge_batch(&mut self, states: &Vec<ArrayRef>) -> Result<()> {
- if states.len() == 0 {
+ if states.is_empty() {
return Ok(());
};
- (0..states[0].len())
- .map(|index| {
- let v = states
- .iter()
- .map(|array| ScalarValue::try_from_array(array, index))
- .collect::<Result<Vec<_>>>()?;
- self.merge(&v)
- })
- .collect::<Result<_>>()
+ (0..states[0].len()).try_for_each(|index| {
+ let v = states
+ .iter()
+ .map(|array| ScalarValue::try_from_array(array, index))
+ .collect::<Result<Vec<_>>>()?;
+ self.merge(&v)
+ })
}
/// returns its value based on its current state.
diff --git a/rust/datafusion/src/physical_plan/planner.rs b/rust/datafusion/src/physical_plan/planner.rs
index 86af6f8..cae2e4e 100644
--- a/rust/datafusion/src/physical_plan/planner.rs
+++ b/rust/datafusion/src/physical_plan/planner.rs
@@ -105,7 +105,7 @@ impl DefaultPhysicalPlanner {
.map(|child| self.optimize_plan(child.clone(), ctx_state))
.collect::<Result<Vec<_>>>()?;
- if children.len() == 0 {
+ if children.is_empty() {
// leaf node, children cannot be replaced
Ok(plan.clone())
} else {
@@ -782,7 +782,7 @@ mod tests {
let expected_error = "DefaultPhysicalPlanner does not know how to plan NoOp";
match plan {
- Ok(_) => assert!(false, "Expected planning failure"),
+ Ok(_) => panic!("Expected planning failure"),
Err(e) => assert!(
e.to_string().contains(expected_error),
"Error '{}' did not contain expected error '{}'",
@@ -826,7 +826,7 @@ mod tests {
dict_is_ordered: false }\
], metadata: {} }";
match plan {
- Ok(_) => assert!(false, "Expected planning failure"),
+ Ok(_) => panic!("Expected planning failure"),
Err(e) => assert!(
e.to_string().contains(expected_error),
"Error '{}' did not contain expected error '{}'",
diff --git a/rust/datafusion/src/physical_plan/string_expressions.rs b/rust/datafusion/src/physical_plan/string_expressions.rs
index ea70c8d..fb65f91 100644
--- a/rust/datafusion/src/physical_plan/string_expressions.rs
+++ b/rust/datafusion/src/physical_plan/string_expressions.rs
@@ -36,7 +36,7 @@ pub fn concatenate(args: &[ArrayRef]) -> Result<StringArray> {
// downcast all arguments to strings
let args = downcast_vec!(args, StringArray).collect::<Result<Vec<&StringArray>>>()?;
// do not accept 0 arguments.
- if args.len() == 0 {
+ if args.is_empty() {
return Err(DataFusionError::Internal(
"Concatenate was called with 0 arguments. It requires at least one."
.to_string(),
diff --git a/rust/datafusion/src/physical_plan/type_coercion.rs b/rust/datafusion/src/physical_plan/type_coercion.rs
index 55e24a7..91eaf65 100644
--- a/rust/datafusion/src/physical_plan/type_coercion.rs
+++ b/rust/datafusion/src/physical_plan/type_coercion.rs
@@ -149,50 +149,33 @@ fn maybe_data_types(
pub fn can_coerce_from(type_into: &DataType, type_from: &DataType) -> bool {
use self::DataType::*;
match type_into {
- Int8 => match type_from {
- Int8 => true,
- _ => false,
- },
- Int16 => match type_from {
- Int8 | Int16 | UInt8 => true,
- _ => false,
- },
- Int32 => match type_from {
- Int8 | Int16 | Int32 | UInt8 | UInt16 => true,
- _ => false,
- },
- Int64 => match type_from {
- Int8 | Int16 | Int32 | Int64 | UInt8 | UInt16 | UInt32 => true,
- _ => false,
- },
- UInt8 => match type_from {
- UInt8 => true,
- _ => false,
- },
- UInt16 => match type_from {
- UInt8 | UInt16 => true,
- _ => false,
- },
- UInt32 => match type_from {
- UInt8 | UInt16 | UInt32 => true,
- _ => false,
- },
- UInt64 => match type_from {
- UInt8 | UInt16 | UInt32 | UInt64 => true,
- _ => false,
- },
- Float32 => match type_from {
- Int8 | Int16 | Int32 | Int64 => true,
- UInt8 | UInt16 | UInt32 | UInt64 => true,
- Float32 => true,
- _ => false,
- },
- Float64 => match type_from {
- Int8 | Int16 | Int32 | Int64 => true,
- UInt8 | UInt16 | UInt32 | UInt64 => true,
- Float32 | Float64 => true,
- _ => false,
- },
+ Int8 => matches!(type_from, Int8),
+ Int16 => matches!(type_from, Int8 | Int16 | UInt8),
+ Int32 => matches!(type_from, Int8 | Int16 | Int32 | UInt8 | UInt16),
+ Int64 => matches!(
+ type_from,
+ Int8 | Int16 | Int32 | Int64 | UInt8 | UInt16 | UInt32
+ ),
+ UInt8 => matches!(type_from, UInt8),
+ UInt16 => matches!(type_from, UInt8 | UInt16),
+ UInt32 => matches!(type_from, UInt8 | UInt16 | UInt32),
+ UInt64 => matches!(type_from, UInt8 | UInt16 | UInt32 | UInt64),
+ Float32 => matches!(
+ type_from,
+ Int8 | Int16 | Int32 | Int64 | UInt8 | UInt16 | UInt32 | UInt64 | Float32
+ ),
+ Float64 => matches!(
+ type_from,
+ Int8 | Int16
+ | Int32
+ | Int64
+ | UInt8
+ | UInt16
+ | UInt32
+ | UInt64
+ | Float32
+ | Float64
+ ),
Utf8 => true,
_ => false,
}
diff --git a/rust/datafusion/src/scalar.rs b/rust/datafusion/src/scalar.rs
index d6d20b9..321271e 100644
--- a/rust/datafusion/src/scalar.rs
+++ b/rust/datafusion/src/scalar.rs
@@ -163,8 +163,7 @@ impl ScalarValue {
/// whether this value is null or not.
pub fn is_null(&self) -> bool {
- match *self {
- ScalarValue::Boolean(None)
+ matches!(*self, ScalarValue::Boolean(None)
| ScalarValue::UInt8(None)
| ScalarValue::UInt16(None)
| ScalarValue::UInt32(None)
@@ -177,9 +176,7 @@ impl ScalarValue {
| ScalarValue::Float64(None)
| ScalarValue::Utf8(None)
| ScalarValue::LargeUtf8(None)
- | ScalarValue::List(None, _) => true,
- _ => false,
- }
+ | ScalarValue::List(None, _))
}
/// Converts a scalar value into an 1-row array.
@@ -391,20 +388,20 @@ impl TryFrom<&DataType> for ScalarValue {
fn try_from(datatype: &DataType) -> Result<Self> {
Ok(match datatype {
- &DataType::Boolean => ScalarValue::Boolean(None),
- &DataType::Float64 => ScalarValue::Float64(None),
- &DataType::Float32 => ScalarValue::Float32(None),
- &DataType::Int8 => ScalarValue::Int8(None),
- &DataType::Int16 => ScalarValue::Int16(None),
- &DataType::Int32 => ScalarValue::Int32(None),
- &DataType::Int64 => ScalarValue::Int64(None),
- &DataType::UInt8 => ScalarValue::UInt8(None),
- &DataType::UInt16 => ScalarValue::UInt16(None),
- &DataType::UInt32 => ScalarValue::UInt32(None),
- &DataType::UInt64 => ScalarValue::UInt64(None),
- &DataType::Utf8 => ScalarValue::Utf8(None),
- &DataType::LargeUtf8 => ScalarValue::LargeUtf8(None),
- &DataType::List(ref nested_type) => {
+ DataType::Boolean => ScalarValue::Boolean(None),
+ DataType::Float64 => ScalarValue::Float64(None),
+ DataType::Float32 => ScalarValue::Float32(None),
+ DataType::Int8 => ScalarValue::Int8(None),
+ DataType::Int16 => ScalarValue::Int16(None),
+ DataType::Int32 => ScalarValue::Int32(None),
+ DataType::Int64 => ScalarValue::Int64(None),
+ DataType::UInt8 => ScalarValue::UInt8(None),
+ DataType::UInt16 => ScalarValue::UInt16(None),
+ DataType::UInt32 => ScalarValue::UInt32(None),
+ DataType::UInt64 => ScalarValue::UInt64(None),
+ DataType::Utf8 => ScalarValue::Utf8(None),
+ DataType::LargeUtf8 => ScalarValue::LargeUtf8(None),
+ DataType::List(ref nested_type) => {
ScalarValue::List(None, nested_type.data_type().clone())
}
_ => {
diff --git a/rust/datafusion/src/sql/parser.rs b/rust/datafusion/src/sql/parser.rs
index 0324445..e71d1a8 100644
--- a/rust/datafusion/src/sql/parser.rs
+++ b/rust/datafusion/src/sql/parser.rs
@@ -337,8 +337,7 @@ mod tests {
fn expect_parse_error(sql: &str, expected_error: &str) -> Result<(), ParserError> {
match DFParser::parse_sql(sql) {
Ok(statements) => {
- assert!(
- false,
+ panic!(
"Expected parse error for '{}', but was successful: {:?}",
sql, statements
);
diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs
index dd375dd..562e580 100644
--- a/rust/datafusion/src/sql/planner.rs
+++ b/rust/datafusion/src/sql/planner.rs
@@ -369,7 +369,7 @@ impl<'a, S: SchemaProvider> SqlToRel<'a, S> {
join_keys.push((r.as_str(), l.as_str()));
}
}
- if join_keys.len() == 0 {
+ if join_keys.is_empty() {
return Err(DataFusionError::NotImplemented(
"Cartesian joins are not supported".to_string(),
));
@@ -419,7 +419,7 @@ impl<'a, S: SchemaProvider> SqlToRel<'a, S> {
.collect();
// apply projection or aggregate
- let plan = if (select.group_by.len() > 0) | (aggr_expr.len() > 0) {
+ let plan = if !select.group_by.is_empty() | !aggr_expr.is_empty() {
self.aggregate(&plan, projection_expr, &select.group_by, aggr_expr)?
} else {
self.project(&plan, projection_expr)?
@@ -505,7 +505,7 @@ impl<'a, S: SchemaProvider> SqlToRel<'a, S> {
plan: &LogicalPlan,
order_by: &Vec<OrderByExpr>,
) -> Result<LogicalPlan> {
- if order_by.len() == 0 {
+ if order_by.is_empty() {
return Ok(plan.clone());
}
diff --git a/rust/datafusion/src/test/mod.rs b/rust/datafusion/src/test/mod.rs
index bfa931d..42bdf37 100644
--- a/rust/datafusion/src/test/mod.rs
+++ b/rust/datafusion/src/test/mod.rs
@@ -78,8 +78,7 @@ pub fn create_partitioned_csv(filename: &str, partitions: usize) -> Result<Strin
let f = File::open(&path)?;
let f = BufReader::new(f);
- let mut i = 0;
- for line in f.lines() {
+ for (i, line) in f.lines().enumerate() {
let line = line.unwrap();
if i == 0 {
@@ -94,8 +93,6 @@ pub fn create_partitioned_csv(filename: &str, partitions: usize) -> Result<Strin
writers[partition].write_all(line.as_bytes()).unwrap();
writers[partition].write_all(b"\n").unwrap();
}
-
- i += 1;
}
for w in writers.iter_mut() {
w.flush().unwrap();