You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2020/12/13 12:31:28 UTC

[arrow] branch master updated: ARROW-10893: [Rust] [DataFusion] More clippy lints

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 989757f  ARROW-10893: [Rust] [DataFusion] More clippy lints
989757f is described below

commit 989757f68764830956ba11bf8d9fa7e19c114307
Author: Neville Dipale <ne...@gmail.com>
AuthorDate: Sun Dec 13 07:30:02 2020 -0500

    ARROW-10893: [Rust] [DataFusion] More clippy lints
    
    This addresses more clippy lints, especially the ones that `cargo clippy --fix -Z unstable-options --allow-dirty` can address
    
    Closes #8905 from nevi-me/ARROW-10893
    
    Authored-by: Neville Dipale <ne...@gmail.com>
    Signed-off-by: Andrew Lamb <an...@nerdnetworks.org>
---
 rust/datafusion/src/datasource/memory.rs           |  7 +--
 rust/datafusion/src/execution/context.rs           | 21 +++----
 rust/datafusion/src/lib.rs                         | 18 +-----
 rust/datafusion/src/logical_plan/builder.rs        |  4 +-
 rust/datafusion/src/logical_plan/dfschema.rs       | 14 ++---
 rust/datafusion/src/optimizer/filter_push_down.rs  | 16 ++---
 .../src/optimizer/projection_push_down.rs          | 42 ++++++-------
 rust/datafusion/src/optimizer/utils.rs             |  8 +--
 .../src/physical_plan/array_expressions.rs         |  2 +-
 rust/datafusion/src/physical_plan/common.rs        |  6 +-
 rust/datafusion/src/physical_plan/csv.rs           |  7 +--
 .../src/physical_plan/datetime_expressions.rs      |  7 +--
 .../src/physical_plan/distinct_expressions.rs      | 28 ++++-----
 rust/datafusion/src/physical_plan/expressions.rs   | 23 ++++---
 rust/datafusion/src/physical_plan/functions.rs     |  2 +-
 .../datafusion/src/physical_plan/hash_aggregate.rs | 25 ++++----
 rust/datafusion/src/physical_plan/hash_utils.rs    |  6 +-
 rust/datafusion/src/physical_plan/limit.rs         |  4 +-
 rust/datafusion/src/physical_plan/mod.rs           | 36 +++++------
 rust/datafusion/src/physical_plan/planner.rs       |  6 +-
 .../src/physical_plan/string_expressions.rs        |  2 +-
 rust/datafusion/src/physical_plan/type_coercion.rs | 71 ++++++++--------------
 rust/datafusion/src/scalar.rs                      | 35 +++++------
 rust/datafusion/src/sql/parser.rs                  |  3 +-
 rust/datafusion/src/sql/planner.rs                 |  6 +-
 rust/datafusion/src/test/mod.rs                    |  5 +-
 26 files changed, 166 insertions(+), 238 deletions(-)

diff --git a/rust/datafusion/src/datasource/memory.rs b/rust/datafusion/src/datasource/memory.rs
index f00a381..e34e2d1 100644
--- a/rust/datafusion/src/datasource/memory.rs
+++ b/rust/datafusion/src/datasource/memory.rs
@@ -241,7 +241,7 @@ mod tests {
             Err(DataFusionError::Internal(e)) => {
                 assert_eq!("\"Projection index out of range\"", format!("{:?}", e))
             }
-            _ => assert!(false, "Scan should failed on invalid projection"),
+            _ => panic!("Scan should failed on invalid projection"),
         };
 
         Ok(())
@@ -275,10 +275,7 @@ mod tests {
                 "\"Mismatch between schema and batches\"",
                 format!("{:?}", e)
             ),
-            _ => assert!(
-                false,
-                "MemTable::new should have failed due to schema mismatch"
-            ),
+            _ => panic!("MemTable::new should have failed due to schema mismatch"),
         }
 
         Ok(())
diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs
index 2ecccb7..d74c7db 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -106,7 +106,7 @@ impl ExecutionContext {
 
     /// Create a new execution context using the provided configuration
     pub fn with_config(config: ExecutionConfig) -> Self {
-        let ctx = Self {
+        Self {
             state: ExecutionContextState {
                 datasources: HashMap::new(),
                 scalar_functions: HashMap::new(),
@@ -114,8 +114,7 @@ impl ExecutionContext {
                 aggregate_functions: HashMap::new(),
                 config,
             },
-        };
-        ctx
+        }
     }
 
     /// Get the configuration of this execution context
@@ -514,15 +513,11 @@ impl SchemaProvider for ExecutionContextState {
     }
 
     fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
-        self.scalar_functions
-            .get(name)
-            .and_then(|func| Some(func.clone()))
+        self.scalar_functions.get(name).cloned()
     }
 
     fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>> {
-        self.aggregate_functions
-            .get(name)
-            .and_then(|func| Some(func.clone()))
+        self.aggregate_functions.get(name).cloned()
     }
 }
 
@@ -676,9 +671,9 @@ mod tests {
                     assert_eq!(table_schema.fields().len(), 2);
                     assert_eq!(projected_schema.fields().len(), 1);
                 }
-                _ => assert!(false, "input to projection should be TableScan"),
+                _ => panic!("input to projection should be TableScan"),
             },
-            _ => assert!(false, "expect optimized_plan to be projection"),
+            _ => panic!("expect optimized_plan to be projection"),
         }
 
         let expected = "Projection: #c2\
@@ -754,9 +749,9 @@ mod tests {
                     assert_eq!(table_schema.fields().len(), 3);
                     assert_eq!(projected_schema.fields().len(), 1);
                 }
-                _ => assert!(false, "input to projection should be InMemoryScan"),
+                _ => panic!("input to projection should be InMemoryScan"),
             },
-            _ => assert!(false, "expect optimized_plan to be projection"),
+            _ => panic!("expect optimized_plan to be projection"),
         }
 
         let expected = "Projection: #b\
diff --git a/rust/datafusion/src/lib.rs b/rust/datafusion/src/lib.rs
index 586250a..7f01a6f 100644
--- a/rust/datafusion/src/lib.rs
+++ b/rust/datafusion/src/lib.rs
@@ -17,30 +17,14 @@
 #![warn(missing_docs)]
 // Clippy lints, some should be disabled incrementally
 #![allow(
-    clippy::assertions_on_constants,
-    clippy::bind_instead_of_map,
-    clippy::blocks_in_if_conditions,
-    clippy::collapsible_if,
-    clippy::explicit_counter_loop,
     clippy::field_reassign_with_default,
     clippy::float_cmp,
-    clippy::into_iter_on_ref,
-    clippy::len_zero,
-    clippy::let_and_return,
-    clippy::map_collect_result_unit,
-    clippy::match_like_matches_macro,
-    clippy::match_ref_pats,
     clippy::module_inception,
     clippy::needless_lifetimes,
     clippy::needless_range_loop,
-    clippy::needless_return,
     clippy::new_without_default,
     clippy::ptr_arg,
-    clippy::single_match,
-    clippy::stable_sort_primitive,
-    clippy::type_complexity,
-    clippy::unit_arg,
-    clippy::zero_prefixed_literal
+    clippy::type_complexity
 )]
 
 //! DataFusion is an extensible query execution framework that uses
diff --git a/rust/datafusion/src/logical_plan/builder.rs b/rust/datafusion/src/logical_plan/builder.rs
index 942de72..dc75814 100644
--- a/rust/datafusion/src/logical_plan/builder.rs
+++ b/rust/datafusion/src/logical_plan/builder.rs
@@ -344,7 +344,7 @@ fn validate_unique_names(
     input_schema: &DFSchema,
 ) -> Result<()> {
     let mut unique_names = HashMap::new();
-    expressions.iter().enumerate().map(|(position, expr)| {
+    expressions.iter().enumerate().try_for_each(|(position, expr)| {
         let name = expr.name(input_schema)?;
         match unique_names.get(&name) {
             None => {
@@ -361,7 +361,7 @@ fn validate_unique_names(
                 ))
             }
         }
-    }).collect::<Result<()>>()
+    })
 }
 
 #[cfg(test)]
diff --git a/rust/datafusion/src/logical_plan/dfschema.rs b/rust/datafusion/src/logical_plan/dfschema.rs
index 5d72df8..b6c1d21 100644
--- a/rust/datafusion/src/logical_plan/dfschema.rs
+++ b/rust/datafusion/src/logical_plan/dfschema.rs
@@ -55,13 +55,11 @@ impl DFSchema {
                         field.qualified_name()
                     )));
                 }
-            } else {
-                if !unqualified_names.insert(field.name()) {
-                    return Err(DataFusionError::Plan(format!(
-                        "Schema contains duplicate unqualified field name '{}'",
-                        field.name()
-                    )));
-                }
+            } else if !unqualified_names.insert(field.name()) {
+                return Err(DataFusionError::Plan(format!(
+                    "Schema contains duplicate unqualified field name '{}'",
+                    field.name()
+                )));
             }
         }
 
@@ -217,7 +215,7 @@ impl TryFrom<Schema> for DFSchema {
         Self::new(
             schema
                 .fields()
-                .into_iter()
+                .iter()
                 .map(|f| DFField {
                     field: f.clone(),
                     qualifier: None,
diff --git a/rust/datafusion/src/optimizer/filter_push_down.rs b/rust/datafusion/src/optimizer/filter_push_down.rs
index f095f8d..6a1e6d2 100644
--- a/rust/datafusion/src/optimizer/filter_push_down.rs
+++ b/rust/datafusion/src/optimizer/filter_push_down.rs
@@ -68,11 +68,10 @@ fn get_predicates<'a>(
         .filters
         .iter()
         .filter(|(_, columns)| {
-            columns
+            !columns
                 .intersection(used_columns)
                 .collect::<HashSet<_>>()
-                .len()
-                > 0
+                .is_empty()
         })
         .map(|&(ref a, ref b)| (a, b))
         .unzip()
@@ -330,7 +329,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
 
 impl OptimizerRule for FilterPushDown {
     fn name(&self) -> &str {
-        return "filter_push_down";
+        "filter_push_down"
     }
 
     fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
@@ -354,13 +353,10 @@ fn rewrite(expr: &Expr, projection: &HashMap<String, Expr>) -> Result<Expr> {
         .map(|e| rewrite(e, &projection))
         .collect::<Result<Vec<_>>>()?;
 
-    match expr {
-        Expr::Column(name) => {
-            if let Some(expr) = projection.get(name) {
-                return Ok(expr.clone());
-            }
+    if let Expr::Column(name) = expr {
+        if let Some(expr) = projection.get(name) {
+            return Ok(expr.clone());
         }
-        _ => {}
     }
 
     utils::rewrite_expression(&expr, &expressions)
diff --git a/rust/datafusion/src/optimizer/projection_push_down.rs b/rust/datafusion/src/optimizer/projection_push_down.rs
index 72b14bc..c1240d9 100644
--- a/rust/datafusion/src/optimizer/projection_push_down.rs
+++ b/rust/datafusion/src/optimizer/projection_push_down.rs
@@ -40,11 +40,11 @@ impl OptimizerRule for ProjectionPushDown {
             .iter()
             .map(|f| f.name().clone())
             .collect::<HashSet<String>>();
-        return optimize_plan(self, plan, &required_columns, false);
+        optimize_plan(self, plan, &required_columns, false)
     }
 
     fn name(&self) -> &str {
-        return "projection_push_down";
+        "projection_push_down"
     }
 }
 
@@ -95,7 +95,7 @@ fn get_projected_schema(
     }
 
     // sort the projection otherwise we get non-deterministic behavior
-    projection.sort();
+    projection.sort_unstable();
 
     // create the projected schema
     let mut projected_fields: Vec<DFField> = Vec::with_capacity(projection.len());
@@ -132,7 +132,7 @@ fn optimize_plan(
                 .fields()
                 .iter()
                 .enumerate()
-                .map(|(i, field)| {
+                .try_for_each(|(i, field)| {
                     if required_columns.contains(field.name()) {
                         new_expr.push(expr[i].clone());
                         new_fields.push(field.clone());
@@ -142,12 +142,11 @@ fn optimize_plan(
                     } else {
                         Ok(())
                     }
-                })
-                .collect::<Result<()>>()?;
+                })?;
 
             let new_input =
                 optimize_plan(optimizer, &input, &new_required_columns, true)?;
-            if new_fields.len() == 0 {
+            if new_fields.is_empty() {
                 // no need for an expression at all
                 Ok(new_input)
             } else {
@@ -203,22 +202,19 @@ fn optimize_plan(
 
             // Gather all columns needed for expressions in this Aggregate
             let mut new_aggr_expr = Vec::new();
-            aggr_expr
-                .iter()
-                .map(|expr| {
-                    let name = &expr.name(&schema)?;
-
-                    if required_columns.contains(name) {
-                        new_aggr_expr.push(expr.clone());
-                        new_required_columns.insert(name.clone());
-
-                        // add to the new set of required columns
-                        utils::expr_to_column_names(expr, &mut new_required_columns)
-                    } else {
-                        Ok(())
-                    }
-                })
-                .collect::<Result<()>>()?;
+            aggr_expr.iter().try_for_each(|expr| {
+                let name = &expr.name(&schema)?;
+
+                if required_columns.contains(name) {
+                    new_aggr_expr.push(expr.clone());
+                    new_required_columns.insert(name.clone());
+
+                    // add to the new set of required columns
+                    utils::expr_to_column_names(expr, &mut new_required_columns)
+                } else {
+                    Ok(())
+                }
+            })?;
 
             let new_schema = DFSchema::new(
                 schema
diff --git a/rust/datafusion/src/optimizer/utils.rs b/rust/datafusion/src/optimizer/utils.rs
index 7a7d89c..f482a17 100644
--- a/rust/datafusion/src/optimizer/utils.rs
+++ b/rust/datafusion/src/optimizer/utils.rs
@@ -438,7 +438,7 @@ mod tests {
         }
 
         fn name(&self) -> &str {
-            return "test_optimizer";
+            "test_optimizer"
         }
     }
 
@@ -476,11 +476,7 @@ mod tests {
                 ];
                 assert_eq!(*stringified_plans, expected_stringified_plans);
             }
-            _ => assert!(
-                false,
-                "Expected explain plan but got {:?}",
-                optimized_explain
-            ),
+            _ => panic!("Expected explain plan but got {:?}", optimized_explain),
         }
 
         Ok(())
diff --git a/rust/datafusion/src/physical_plan/array_expressions.rs b/rust/datafusion/src/physical_plan/array_expressions.rs
index 7bdb742..9af81ad 100644
--- a/rust/datafusion/src/physical_plan/array_expressions.rs
+++ b/rust/datafusion/src/physical_plan/array_expressions.rs
@@ -61,7 +61,7 @@ macro_rules! array {
 /// put values in an array.
 pub fn array(args: &[ArrayRef]) -> Result<ArrayRef> {
     // do not accept 0 arguments.
-    if args.len() == 0 {
+    if args.is_empty() {
         return Err(DataFusionError::Internal(
             "array requires at least one argument".to_string(),
         ));
diff --git a/rust/datafusion/src/physical_plan/common.rs b/rust/datafusion/src/physical_plan/common.rs
index 3c63a44..2b96c45 100644
--- a/rust/datafusion/src/physical_plan/common.rs
+++ b/rust/datafusion/src/physical_plan/common.rs
@@ -101,10 +101,8 @@ pub fn build_file_list(dir: &str, filenames: &mut Vec<String>, ext: &str) -> Res
             if let Some(path_name) = path.to_str() {
                 if path.is_dir() {
                     build_file_list(path_name, filenames, ext)?;
-                } else {
-                    if path_name.ends_with(ext) {
-                        filenames.push(path_name.to_string());
-                    }
+                } else if path_name.ends_with(ext) {
+                    filenames.push(path_name.to_string());
                 }
             } else {
                 return Err(DataFusionError::Plan("Invalid path".to_string()));
diff --git a/rust/datafusion/src/physical_plan/csv.rs b/rust/datafusion/src/physical_plan/csv.rs
index 857e874..54adb38 100644
--- a/rust/datafusion/src/physical_plan/csv.rs
+++ b/rust/datafusion/src/physical_plan/csv.rs
@@ -87,11 +87,8 @@ impl<'a> CsvReadOptions<'a> {
 
     /// Configure delimiter setting with Option, None value will be ignored
     pub fn delimiter_option(mut self, delimiter: Option<u8>) -> Self {
-        match delimiter {
-            Some(d) => {
-                self.delimiter = d;
-            }
-            _ => (),
+        if let Some(d) = delimiter {
+            self.delimiter = d;
         }
         self
     }
diff --git a/rust/datafusion/src/physical_plan/datetime_expressions.rs b/rust/datafusion/src/physical_plan/datetime_expressions.rs
index de86f5f..a12b00c 100644
--- a/rust/datafusion/src/physical_plan/datetime_expressions.rs
+++ b/rust/datafusion/src/physical_plan/datetime_expressions.rs
@@ -280,7 +280,7 @@ mod tests {
         // timezone the test machine is running. Thus it is still
         // somewhat suceptable to bugs in the use of chrono
         let naive_datetime = NaiveDateTime::new(
-            NaiveDate::from_ymd(2020, 09, 08),
+            NaiveDate::from_ymd(2020, 9, 8),
             NaiveTime::from_hms_nano(13, 42, 29, 190855),
         );
 
@@ -298,7 +298,7 @@ mod tests {
         // Also ensure that parsing timestamps with no fractional
         // second part works as well
         let naive_datetime_whole_secs = NaiveDateTime::new(
-            NaiveDate::from_ymd(2020, 09, 08),
+            NaiveDate::from_ymd(2020, 9, 8),
             NaiveTime::from_hms(13, 42, 29),
         );
 
@@ -342,8 +342,7 @@ mod tests {
 
     fn expect_timestamp_parse_error(s: &str, expected_err: &str) {
         match string_to_timestamp_nanos(s) {
-            Ok(v) => assert!(
-                false,
+            Ok(v) => panic!(
                 "Expected error '{}' while parsing '{}', but parsed {} instead",
                 expected_err, s, v
             ),
diff --git a/rust/datafusion/src/physical_plan/distinct_expressions.rs b/rust/datafusion/src/physical_plan/distinct_expressions.rs
index bbccc3b..9aa639a 100644
--- a/rust/datafusion/src/physical_plan/distinct_expressions.rs
+++ b/rust/datafusion/src/physical_plan/distinct_expressions.rs
@@ -124,7 +124,7 @@ impl Accumulator for DistinctCountAccumulator {
     }
 
     fn merge(&mut self, states: &Vec<ScalarValue>) -> Result<()> {
-        if states.len() == 0 {
+        if states.is_empty() {
             return Ok(());
         }
 
@@ -138,15 +138,13 @@ impl Accumulator for DistinctCountAccumulator {
             })
             .collect::<Result<Vec<_>>>()?;
 
-        (0..col_values[0].len())
-            .map(|row_index| {
-                let row_values = col_values
-                    .iter()
-                    .map(|col| col[row_index].clone())
-                    .collect::<Vec<_>>();
-                self.update(&row_values)
-            })
-            .collect::<Result<_>>()
+        (0..col_values[0].len()).try_for_each(|row_index| {
+            let row_values = col_values
+                .iter()
+                .map(|col| col[row_index].clone())
+                .collect::<Vec<_>>();
+            self.update(&row_values)
+        })
     }
 
     fn state(&self) -> Result<Vec<ScalarValue>> {
@@ -178,12 +176,10 @@ impl Accumulator for DistinctCountAccumulator {
     fn evaluate(&self) -> Result<ScalarValue> {
         match &self.count_data_type {
             DataType::UInt64 => Ok(ScalarValue::UInt64(Some(self.values.len() as u64))),
-            t => {
-                return Err(DataFusionError::Internal(format!(
-                    "Invalid data type {:?} for count distinct aggregation",
-                    t
-                )))
-            }
+            t => Err(DataFusionError::Internal(format!(
+                "Invalid data type {:?} for count distinct aggregation",
+                t
+            ))),
         }
     }
 }
diff --git a/rust/datafusion/src/physical_plan/expressions.rs b/rust/datafusion/src/physical_plan/expressions.rs
index c294eed..79045d9 100644
--- a/rust/datafusion/src/physical_plan/expressions.rs
+++ b/rust/datafusion/src/physical_plan/expressions.rs
@@ -1638,7 +1638,7 @@ impl fmt::Display for NotExpr {
 
 impl PhysicalExpr for NotExpr {
     fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
-        return Ok(DataType::Boolean);
+        Ok(DataType::Boolean)
     }
 
     fn nullable(&self, input_schema: &Schema) -> Result<bool> {
@@ -1791,7 +1791,7 @@ impl fmt::Display for IsNullExpr {
 }
 impl PhysicalExpr for IsNullExpr {
     fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
-        return Ok(DataType::Boolean);
+        Ok(DataType::Boolean)
     }
 
     fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
@@ -1836,7 +1836,7 @@ impl fmt::Display for IsNotNullExpr {
 }
 impl PhysicalExpr for IsNotNullExpr {
     fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
-        return Ok(DataType::Boolean);
+        Ok(DataType::Boolean)
     }
 
     fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
@@ -1911,7 +1911,7 @@ impl CaseExpr {
         when_then_expr: &[(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)],
         else_expr: Option<Arc<dyn PhysicalExpr>>,
     ) -> Result<Self> {
-        if when_then_expr.len() == 0 {
+        if when_then_expr.is_empty() {
             Err(DataFusionError::Execution(
                 "There must be at least one WHEN clause".to_string(),
             ))
@@ -2280,11 +2280,16 @@ pub struct CastExpr {
 
 /// Determine if a DataType is signed numeric or not
 pub fn is_signed_numeric(dt: &DataType) -> bool {
-    match dt {
-        DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => true,
-        DataType::Float16 | DataType::Float32 | DataType::Float64 => true,
-        _ => false,
-    }
+    matches!(
+        dt,
+        DataType::Int8
+            | DataType::Int16
+            | DataType::Int32
+            | DataType::Int64
+            | DataType::Float16
+            | DataType::Float32
+            | DataType::Float64
+    )
 }
 
 /// Determine if a DataType is numeric or not
diff --git a/rust/datafusion/src/physical_plan/functions.rs b/rust/datafusion/src/physical_plan/functions.rs
index d296512..fb6c63b 100644
--- a/rust/datafusion/src/physical_plan/functions.rs
+++ b/rust/datafusion/src/physical_plan/functions.rs
@@ -180,7 +180,7 @@ pub fn return_type(
     // verify that this is a valid set of data types for this function
     data_types(&arg_types, &signature(fun))?;
 
-    if arg_types.len() == 0 {
+    if arg_types.is_empty() {
         // functions currently cannot be evaluated without arguments, as they can't
         // know the number of rows to return.
         return Err(DataFusionError::Plan(format!(
diff --git a/rust/datafusion/src/physical_plan/hash_aggregate.rs b/rust/datafusion/src/physical_plan/hash_aggregate.rs
index 27deb94..87ded23 100644
--- a/rust/datafusion/src/physical_plan/hash_aggregate.rs
+++ b/rust/datafusion/src/physical_plan/hash_aggregate.rs
@@ -290,11 +290,10 @@ fn group_aggregate_batch(
     // 2.5 clear indices
     accumulators
         .iter_mut()
-        // 2.1
-        .map(|(_, (_, accumulator_set, indices))| {
+        .try_for_each(|(_, (_, accumulator_set, indices))| {
             // 2.2
             accumulator_set
-                .into_iter()
+                .iter_mut()
                 .zip(&aggr_input_values)
                 .map(|(accumulator, aggr_array)| {
                     (
@@ -313,19 +312,19 @@ fn group_aggregate_batch(
                             .collect::<Vec<ArrayRef>>(),
                     )
                 })
-                // 2.4
-                .map(|(accumulator, values)| match mode {
+                .try_for_each(|(accumulator, values)| match mode {
                     AggregateMode::Partial => accumulator.update_batch(&values),
                     AggregateMode::Final => {
                         // note: the aggregation here is over states, not values, thus the merge
                         accumulator.merge_batch(&values)
                     }
                 })
-                .collect::<Result<()>>()
                 // 2.5
-                .and(Ok(indices.clear()))
-        })
-        .collect::<Result<()>>()?;
+                .and({
+                    indices.clear();
+                    Ok(())
+                })
+        })?;
     Ok(accumulators)
 }
 
@@ -684,7 +683,7 @@ fn create_batch_from_map(
         // 4.
         .collect::<ArrowResult<Vec<Vec<ArrayRef>>>>()?;
 
-    let batch = if arrays.len() != 0 {
+    let batch = if !arrays.is_empty() {
         // 5.
         let columns = concatenate(arrays)?;
         RecordBatch::try_new(Arc::new(output_schema.to_owned()), columns)?
@@ -716,8 +715,8 @@ fn finalize_aggregation(
                 .iter()
                 .map(|accumulator| accumulator.state())
                 .map(|value| {
-                    value.and_then(|e| {
-                        Ok(e.iter().map(|v| v.to_array()).collect::<Vec<ArrayRef>>())
+                    value.map(|e| {
+                        e.iter().map(|v| v.to_array()).collect::<Vec<ArrayRef>>()
                     })
                 })
                 .collect::<Result<Vec<_>>>()?;
@@ -727,7 +726,7 @@ fn finalize_aggregation(
             // merge the state to the final value
             accumulators
                 .iter()
-                .map(|accumulator| accumulator.evaluate().and_then(|v| Ok(v.to_array())))
+                .map(|accumulator| accumulator.evaluate().map(|v| v.to_array()))
                 .collect::<Result<Vec<ArrayRef>>>()
         }
     }
diff --git a/rust/datafusion/src/physical_plan/hash_utils.rs b/rust/datafusion/src/physical_plan/hash_utils.rs
index 03d37bd..5c13dfc 100644
--- a/rust/datafusion/src/physical_plan/hash_utils.rs
+++ b/rust/datafusion/src/physical_plan/hash_utils.rs
@@ -52,7 +52,7 @@ fn check_join_set_is_valid(
     right: &HashSet<String>,
     on: &JoinOn,
 ) -> Result<()> {
-    if on.len() == 0 {
+    if on.is_empty() {
         return Err(DataFusionError::Plan(
             "The 'on' clause of a join cannot be empty".to_string(),
         ));
@@ -63,7 +63,7 @@ fn check_join_set_is_valid(
     let on_right = &on.iter().map(|on| on.1.to_string()).collect::<HashSet<_>>();
     let right_missing = on_right.difference(right).collect::<HashSet<_>>();
 
-    if (left_missing.len() > 0) | (right_missing.len() > 0) {
+    if !left_missing.is_empty() | !right_missing.is_empty() {
         return Err(DataFusionError::Plan(format!(
                 "The left or right side of the join does not have all columns on \"on\": \nMissing on the left: {:?}\nMissing on the right: {:?}",
                 left_missing,
@@ -78,7 +78,7 @@ fn check_join_set_is_valid(
 
     let collisions = left.intersection(&remaining).collect::<HashSet<_>>();
 
-    if collisions.len() > 0 {
+    if !collisions.is_empty() {
         return Err(DataFusionError::Plan(format!(
                 "The left schema and the right schema have the following columns with the same name without being on the ON statement: {:?}. Consider aliasing them.",
                 collisions,
diff --git a/rust/datafusion/src/physical_plan/limit.rs b/rust/datafusion/src/physical_plan/limit.rs
index 78eb896..aebaf34 100644
--- a/rust/datafusion/src/physical_plan/limit.rs
+++ b/rust/datafusion/src/physical_plan/limit.rs
@@ -202,10 +202,10 @@ impl LimitStream {
 
     fn stream_limit(&mut self, batch: RecordBatch) -> Option<RecordBatch> {
         if self.current_len == self.limit {
-            return None;
+            None
         } else if self.current_len + batch.num_rows() <= self.limit {
             self.current_len += batch.num_rows();
-            return Some(batch);
+            Some(batch)
         } else {
             let batch_rows = self.limit - self.current_len;
             self.current_len = self.limit;
diff --git a/rust/datafusion/src/physical_plan/mod.rs b/rust/datafusion/src/physical_plan/mod.rs
index 8a5dfd9..b7cddbc 100644
--- a/rust/datafusion/src/physical_plan/mod.rs
+++ b/rust/datafusion/src/physical_plan/mod.rs
@@ -185,18 +185,16 @@ pub trait Accumulator: Send + Sync + Debug {
 
     /// updates the accumulator's state from a vector of arrays.
     fn update_batch(&mut self, values: &Vec<ArrayRef>) -> Result<()> {
-        if values.len() == 0 {
+        if values.is_empty() {
             return Ok(());
         };
-        (0..values[0].len())
-            .map(|index| {
-                let v = values
-                    .iter()
-                    .map(|array| ScalarValue::try_from_array(array, index))
-                    .collect::<Result<Vec<_>>>()?;
-                self.update(&v)
-            })
-            .collect::<Result<_>>()
+        (0..values[0].len()).try_for_each(|index| {
+            let v = values
+                .iter()
+                .map(|array| ScalarValue::try_from_array(array, index))
+                .collect::<Result<Vec<_>>>()?;
+            self.update(&v)
+        })
     }
 
     /// updates the accumulator's state from a vector of scalars.
@@ -204,18 +202,16 @@ pub trait Accumulator: Send + Sync + Debug {
 
     /// updates the accumulator's state from a vector of states.
     fn merge_batch(&mut self, states: &Vec<ArrayRef>) -> Result<()> {
-        if states.len() == 0 {
+        if states.is_empty() {
             return Ok(());
         };
-        (0..states[0].len())
-            .map(|index| {
-                let v = states
-                    .iter()
-                    .map(|array| ScalarValue::try_from_array(array, index))
-                    .collect::<Result<Vec<_>>>()?;
-                self.merge(&v)
-            })
-            .collect::<Result<_>>()
+        (0..states[0].len()).try_for_each(|index| {
+            let v = states
+                .iter()
+                .map(|array| ScalarValue::try_from_array(array, index))
+                .collect::<Result<Vec<_>>>()?;
+            self.merge(&v)
+        })
     }
 
     /// returns its value based on its current state.
diff --git a/rust/datafusion/src/physical_plan/planner.rs b/rust/datafusion/src/physical_plan/planner.rs
index 86af6f8..cae2e4e 100644
--- a/rust/datafusion/src/physical_plan/planner.rs
+++ b/rust/datafusion/src/physical_plan/planner.rs
@@ -105,7 +105,7 @@ impl DefaultPhysicalPlanner {
             .map(|child| self.optimize_plan(child.clone(), ctx_state))
             .collect::<Result<Vec<_>>>()?;
 
-        if children.len() == 0 {
+        if children.is_empty() {
             // leaf node, children cannot be replaced
             Ok(plan.clone())
         } else {
@@ -782,7 +782,7 @@ mod tests {
 
         let expected_error = "DefaultPhysicalPlanner does not know how to plan NoOp";
         match plan {
-            Ok(_) => assert!(false, "Expected planning failure"),
+            Ok(_) => panic!("Expected planning failure"),
             Err(e) => assert!(
                 e.to_string().contains(expected_error),
                 "Error '{}' did not contain expected error '{}'",
@@ -826,7 +826,7 @@ mod tests {
                 dict_is_ordered: false }\
         ], metadata: {} }";
         match plan {
-            Ok(_) => assert!(false, "Expected planning failure"),
+            Ok(_) => panic!("Expected planning failure"),
             Err(e) => assert!(
                 e.to_string().contains(expected_error),
                 "Error '{}' did not contain expected error '{}'",
diff --git a/rust/datafusion/src/physical_plan/string_expressions.rs b/rust/datafusion/src/physical_plan/string_expressions.rs
index ea70c8d..fb65f91 100644
--- a/rust/datafusion/src/physical_plan/string_expressions.rs
+++ b/rust/datafusion/src/physical_plan/string_expressions.rs
@@ -36,7 +36,7 @@ pub fn concatenate(args: &[ArrayRef]) -> Result<StringArray> {
     // downcast all arguments to strings
     let args = downcast_vec!(args, StringArray).collect::<Result<Vec<&StringArray>>>()?;
     // do not accept 0 arguments.
-    if args.len() == 0 {
+    if args.is_empty() {
         return Err(DataFusionError::Internal(
             "Concatenate was called with 0 arguments. It requires at least one."
                 .to_string(),
diff --git a/rust/datafusion/src/physical_plan/type_coercion.rs b/rust/datafusion/src/physical_plan/type_coercion.rs
index 55e24a7..91eaf65 100644
--- a/rust/datafusion/src/physical_plan/type_coercion.rs
+++ b/rust/datafusion/src/physical_plan/type_coercion.rs
@@ -149,50 +149,33 @@ fn maybe_data_types(
 pub fn can_coerce_from(type_into: &DataType, type_from: &DataType) -> bool {
     use self::DataType::*;
     match type_into {
-        Int8 => match type_from {
-            Int8 => true,
-            _ => false,
-        },
-        Int16 => match type_from {
-            Int8 | Int16 | UInt8 => true,
-            _ => false,
-        },
-        Int32 => match type_from {
-            Int8 | Int16 | Int32 | UInt8 | UInt16 => true,
-            _ => false,
-        },
-        Int64 => match type_from {
-            Int8 | Int16 | Int32 | Int64 | UInt8 | UInt16 | UInt32 => true,
-            _ => false,
-        },
-        UInt8 => match type_from {
-            UInt8 => true,
-            _ => false,
-        },
-        UInt16 => match type_from {
-            UInt8 | UInt16 => true,
-            _ => false,
-        },
-        UInt32 => match type_from {
-            UInt8 | UInt16 | UInt32 => true,
-            _ => false,
-        },
-        UInt64 => match type_from {
-            UInt8 | UInt16 | UInt32 | UInt64 => true,
-            _ => false,
-        },
-        Float32 => match type_from {
-            Int8 | Int16 | Int32 | Int64 => true,
-            UInt8 | UInt16 | UInt32 | UInt64 => true,
-            Float32 => true,
-            _ => false,
-        },
-        Float64 => match type_from {
-            Int8 | Int16 | Int32 | Int64 => true,
-            UInt8 | UInt16 | UInt32 | UInt64 => true,
-            Float32 | Float64 => true,
-            _ => false,
-        },
+        Int8 => matches!(type_from, Int8),
+        Int16 => matches!(type_from, Int8 | Int16 | UInt8),
+        Int32 => matches!(type_from, Int8 | Int16 | Int32 | UInt8 | UInt16),
+        Int64 => matches!(
+            type_from,
+            Int8 | Int16 | Int32 | Int64 | UInt8 | UInt16 | UInt32
+        ),
+        UInt8 => matches!(type_from, UInt8),
+        UInt16 => matches!(type_from, UInt8 | UInt16),
+        UInt32 => matches!(type_from, UInt8 | UInt16 | UInt32),
+        UInt64 => matches!(type_from, UInt8 | UInt16 | UInt32 | UInt64),
+        Float32 => matches!(
+            type_from,
+            Int8 | Int16 | Int32 | Int64 | UInt8 | UInt16 | UInt32 | UInt64 | Float32
+        ),
+        Float64 => matches!(
+            type_from,
+            Int8 | Int16
+                | Int32
+                | Int64
+                | UInt8
+                | UInt16
+                | UInt32
+                | UInt64
+                | Float32
+                | Float64
+        ),
         Utf8 => true,
         _ => false,
     }
diff --git a/rust/datafusion/src/scalar.rs b/rust/datafusion/src/scalar.rs
index d6d20b9..321271e 100644
--- a/rust/datafusion/src/scalar.rs
+++ b/rust/datafusion/src/scalar.rs
@@ -163,8 +163,7 @@ impl ScalarValue {
 
     /// whether this value is null or not.
     pub fn is_null(&self) -> bool {
-        match *self {
-            ScalarValue::Boolean(None)
+        matches!(*self, ScalarValue::Boolean(None)
             | ScalarValue::UInt8(None)
             | ScalarValue::UInt16(None)
             | ScalarValue::UInt32(None)
@@ -177,9 +176,7 @@ impl ScalarValue {
             | ScalarValue::Float64(None)
             | ScalarValue::Utf8(None)
             | ScalarValue::LargeUtf8(None)
-            | ScalarValue::List(None, _) => true,
-            _ => false,
-        }
+            | ScalarValue::List(None, _))
     }
 
     /// Converts a scalar value into an 1-row array.
@@ -391,20 +388,20 @@ impl TryFrom<&DataType> for ScalarValue {
 
     fn try_from(datatype: &DataType) -> Result<Self> {
         Ok(match datatype {
-            &DataType::Boolean => ScalarValue::Boolean(None),
-            &DataType::Float64 => ScalarValue::Float64(None),
-            &DataType::Float32 => ScalarValue::Float32(None),
-            &DataType::Int8 => ScalarValue::Int8(None),
-            &DataType::Int16 => ScalarValue::Int16(None),
-            &DataType::Int32 => ScalarValue::Int32(None),
-            &DataType::Int64 => ScalarValue::Int64(None),
-            &DataType::UInt8 => ScalarValue::UInt8(None),
-            &DataType::UInt16 => ScalarValue::UInt16(None),
-            &DataType::UInt32 => ScalarValue::UInt32(None),
-            &DataType::UInt64 => ScalarValue::UInt64(None),
-            &DataType::Utf8 => ScalarValue::Utf8(None),
-            &DataType::LargeUtf8 => ScalarValue::LargeUtf8(None),
-            &DataType::List(ref nested_type) => {
+            DataType::Boolean => ScalarValue::Boolean(None),
+            DataType::Float64 => ScalarValue::Float64(None),
+            DataType::Float32 => ScalarValue::Float32(None),
+            DataType::Int8 => ScalarValue::Int8(None),
+            DataType::Int16 => ScalarValue::Int16(None),
+            DataType::Int32 => ScalarValue::Int32(None),
+            DataType::Int64 => ScalarValue::Int64(None),
+            DataType::UInt8 => ScalarValue::UInt8(None),
+            DataType::UInt16 => ScalarValue::UInt16(None),
+            DataType::UInt32 => ScalarValue::UInt32(None),
+            DataType::UInt64 => ScalarValue::UInt64(None),
+            DataType::Utf8 => ScalarValue::Utf8(None),
+            DataType::LargeUtf8 => ScalarValue::LargeUtf8(None),
+            DataType::List(ref nested_type) => {
                 ScalarValue::List(None, nested_type.data_type().clone())
             }
             _ => {
diff --git a/rust/datafusion/src/sql/parser.rs b/rust/datafusion/src/sql/parser.rs
index 0324445..e71d1a8 100644
--- a/rust/datafusion/src/sql/parser.rs
+++ b/rust/datafusion/src/sql/parser.rs
@@ -337,8 +337,7 @@ mod tests {
     fn expect_parse_error(sql: &str, expected_error: &str) -> Result<(), ParserError> {
         match DFParser::parse_sql(sql) {
             Ok(statements) => {
-                assert!(
-                    false,
+                panic!(
                     "Expected parse error for '{}', but was successful: {:?}",
                     sql, statements
                 );
diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs
index dd375dd..562e580 100644
--- a/rust/datafusion/src/sql/planner.rs
+++ b/rust/datafusion/src/sql/planner.rs
@@ -369,7 +369,7 @@ impl<'a, S: SchemaProvider> SqlToRel<'a, S> {
                             join_keys.push((r.as_str(), l.as_str()));
                         }
                     }
-                    if join_keys.len() == 0 {
+                    if join_keys.is_empty() {
                         return Err(DataFusionError::NotImplemented(
                             "Cartesian joins are not supported".to_string(),
                         ));
@@ -419,7 +419,7 @@ impl<'a, S: SchemaProvider> SqlToRel<'a, S> {
             .collect();
 
         // apply projection or aggregate
-        let plan = if (select.group_by.len() > 0) | (aggr_expr.len() > 0) {
+        let plan = if !select.group_by.is_empty() | !aggr_expr.is_empty() {
             self.aggregate(&plan, projection_expr, &select.group_by, aggr_expr)?
         } else {
             self.project(&plan, projection_expr)?
@@ -505,7 +505,7 @@ impl<'a, S: SchemaProvider> SqlToRel<'a, S> {
         plan: &LogicalPlan,
         order_by: &Vec<OrderByExpr>,
     ) -> Result<LogicalPlan> {
-        if order_by.len() == 0 {
+        if order_by.is_empty() {
             return Ok(plan.clone());
         }
 
diff --git a/rust/datafusion/src/test/mod.rs b/rust/datafusion/src/test/mod.rs
index bfa931d..42bdf37 100644
--- a/rust/datafusion/src/test/mod.rs
+++ b/rust/datafusion/src/test/mod.rs
@@ -78,8 +78,7 @@ pub fn create_partitioned_csv(filename: &str, partitions: usize) -> Result<Strin
 
     let f = File::open(&path)?;
     let f = BufReader::new(f);
-    let mut i = 0;
-    for line in f.lines() {
+    for (i, line) in f.lines().enumerate() {
         let line = line.unwrap();
 
         if i == 0 {
@@ -94,8 +93,6 @@ pub fn create_partitioned_csv(filename: &str, partitions: usize) -> Result<Strin
             writers[partition].write_all(line.as_bytes()).unwrap();
             writers[partition].write_all(b"\n").unwrap();
         }
-
-        i += 1;
     }
     for w in writers.iter_mut() {
         w.flush().unwrap();