You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by nj...@apache.org on 2022/12/19 05:03:16 UTC

[arrow-ballista] branch master updated: Fix cargo clippy (#571)

This is an automated email from the ASF dual-hosted git repository.

nju_yaho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new 908aa5a1 Fix cargo clippy (#571)
908aa5a1 is described below

commit 908aa5a10d2efe9b64b3d165ce4116a928fefc60
Author: yahoNanJing <90...@users.noreply.github.com>
AuthorDate: Mon Dec 19 13:03:11 2022 +0800

    Fix cargo clippy (#571)
    
    Co-authored-by: yangzhong <ya...@ebay.com>
---
 ballista/core/src/execution_plans/shuffle_reader.rs |  2 +-
 ballista/core/src/plugin/mod.rs                     |  2 +-
 ballista/executor/src/executor_server.rs            |  8 ++++----
 ballista/scheduler/src/state/execution_graph.rs     | 10 ++++------
 benchmarks/src/bin/tpch.rs                          |  6 +++---
 5 files changed, 13 insertions(+), 15 deletions(-)

diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs b/ballista/core/src/execution_plans/shuffle_reader.rs
index 5411978f..97e24fd6 100644
--- a/ballista/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/core/src/execution_plans/shuffle_reader.rs
@@ -364,7 +364,7 @@ async fn fetch_partition_remote(
     // TODO for shuffle client connections, we should avoid creating new connections again and again.
     // And we should also avoid to keep alive too many connections for long time.
     let host = metadata.host.as_str();
-    let port = metadata.port as u16;
+    let port = metadata.port;
     let mut ballista_client =
         BallistaClient::try_new(host, port)
             .await
diff --git a/ballista/core/src/plugin/mod.rs b/ballista/core/src/plugin/mod.rs
index 3579c546..d1881713 100644
--- a/ballista/core/src/plugin/mod.rs
+++ b/ballista/core/src/plugin/mod.rs
@@ -50,7 +50,7 @@ impl PluginEnum {
     /// new a struct which impl the PluginRegistrar trait
     pub fn init_plugin_manager(&self) -> Box<dyn PluginRegistrar> {
         match self {
-            PluginEnum::UDF => Box::new(UDFPluginManager::default()),
+            PluginEnum::UDF => Box::<UDFPluginManager>::default(),
         }
     }
 }
diff --git a/ballista/executor/src/executor_server.rs b/ballista/executor/src/executor_server.rs
index da7fb74f..de4696d2 100644
--- a/ballista/executor/src/executor_server.rs
+++ b/ballista/executor/src/executor_server.rs
@@ -342,12 +342,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
         let partition_id = task.partition_id;
         let shuffle_writer_plan =
             self.executor
-                .new_shuffle_writer(job_id.clone(), stage_id as usize, plan)?;
+                .new_shuffle_writer(job_id.clone(), stage_id, plan)?;
 
         let part = PartitionId {
             job_id: job_id.clone(),
-            stage_id: stage_id as usize,
-            partition_id: partition_id as usize,
+            stage_id,
+            partition_id,
         };
 
         info!("Start to execute shuffle write for task {}", task_identity);
@@ -355,7 +355,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
         let execution_result = self
             .executor
             .execute_shuffle_write(
-                task_id as usize,
+                task_id,
                 part.clone(),
                 shuffle_writer_plan.clone(),
                 task_context,
diff --git a/ballista/scheduler/src/state/execution_graph.rs b/ballista/scheduler/src/state/execution_graph.rs
index 6f9b6545..976ec5f5 100644
--- a/ballista/scheduler/src/state/execution_graph.rs
+++ b/ballista/scheduler/src/state/execution_graph.rs
@@ -296,7 +296,6 @@ impl ExecutionGraph {
                     let mut locations = vec![];
                     for task_status in stage_task_statuses.into_iter() {
                         {
-                            let stage_id = stage_id as usize;
                             let task_stage_attempt_num =
                                 task_status.stage_attempt_num as usize;
                             if task_stage_attempt_num < running_stage.stage_attempt_num {
@@ -481,7 +480,6 @@ impl ExecutionGraph {
                     );
                 } else if let ExecutionStage::UnResolved(unsolved_stage) = stage {
                     for task_status in stage_task_statuses.into_iter() {
-                        let stage_id = stage_id as usize;
                         let task_stage_attempt_num =
                             task_status.stage_attempt_num as usize;
                         let partition_id = task_status.clone().partition_id as usize;
@@ -815,8 +813,8 @@ impl ExecutionGraph {
     /// Total number of tasks in this plan that are ready for scheduling
     pub fn available_tasks(&self) -> usize {
         self.stages
-            .iter()
-            .map(|(_, stage)| {
+            .values()
+            .map(|stage| {
                 if let ExecutionStage::Running(stage) = stage {
                     stage.available_tasks()
                 } else {
@@ -1412,8 +1410,8 @@ impl Debug for ExecutionGraph {
     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
         let stages = self
             .stages
-            .iter()
-            .map(|(_, stage)| format!("{:?}", stage))
+            .values()
+            .map(|stage| format!("{:?}", stage))
             .collect::<Vec<String>>()
             .join("");
         write!(f, "ExecutionGraph[job_id={}, session_id={}, available_tasks={}, is_successful={}]\n{}",
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 10ff91ad..aabfea6f 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -330,7 +330,7 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordB
             result = execute_query(&ctx, &plan, opt.debug).await?;
         }
         let elapsed = start.elapsed().as_secs_f64() * 1000.0;
-        millis.push(elapsed as f64);
+        millis.push(elapsed);
         let row_count = result.iter().map(|b| b.num_rows()).sum();
         println!(
             "Query {} iteration {} took {:.1} ms and returned {} rows",
@@ -405,7 +405,7 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> {
                 .unwrap();
         }
         let elapsed = start.elapsed().as_secs_f64() * 1000.0;
-        millis.push(elapsed as f64);
+        millis.push(elapsed);
         let row_count = batches.iter().map(|b| b.num_rows()).sum();
         println!(
             "Query {} iteration {} took {:.1} ms and returned {} rows",
@@ -556,7 +556,7 @@ fn get_query_sql_by_path(query: usize, mut sql_path: String) -> Result<String> {
     }
     if query > 0 && query < 23 {
         let filename = format!("{}/q{}.sql", sql_path, query);
-        Ok(fs::read_to_string(&filename).expect("failed to read query"))
+        Ok(fs::read_to_string(filename).expect("failed to read query"))
     } else {
         Err(DataFusionError::Plan(
             "invalid query. Expected value between 1 and 22".to_owned(),