You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by nj...@apache.org on 2022/12/19 05:03:16 UTC
[arrow-ballista] branch master updated: Fix cargo clippy (#571)
This is an automated email from the ASF dual-hosted git repository.
nju_yaho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new 908aa5a1 Fix cargo clippy (#571)
908aa5a1 is described below
commit 908aa5a10d2efe9b64b3d165ce4116a928fefc60
Author: yahoNanJing <90...@users.noreply.github.com>
AuthorDate: Mon Dec 19 13:03:11 2022 +0800
Fix cargo clippy (#571)
Co-authored-by: yangzhong <ya...@ebay.com>
---
ballista/core/src/execution_plans/shuffle_reader.rs | 2 +-
ballista/core/src/plugin/mod.rs | 2 +-
ballista/executor/src/executor_server.rs | 8 ++++----
ballista/scheduler/src/state/execution_graph.rs | 10 ++++------
benchmarks/src/bin/tpch.rs | 6 +++---
5 files changed, 13 insertions(+), 15 deletions(-)
diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs b/ballista/core/src/execution_plans/shuffle_reader.rs
index 5411978f..97e24fd6 100644
--- a/ballista/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/core/src/execution_plans/shuffle_reader.rs
@@ -364,7 +364,7 @@ async fn fetch_partition_remote(
// TODO for shuffle client connections, we should avoid creating new connections again and again.
// And we should also avoid to keep alive too many connections for long time.
let host = metadata.host.as_str();
- let port = metadata.port as u16;
+ let port = metadata.port;
let mut ballista_client =
BallistaClient::try_new(host, port)
.await
diff --git a/ballista/core/src/plugin/mod.rs b/ballista/core/src/plugin/mod.rs
index 3579c546..d1881713 100644
--- a/ballista/core/src/plugin/mod.rs
+++ b/ballista/core/src/plugin/mod.rs
@@ -50,7 +50,7 @@ impl PluginEnum {
/// new a struct which impl the PluginRegistrar trait
pub fn init_plugin_manager(&self) -> Box<dyn PluginRegistrar> {
match self {
- PluginEnum::UDF => Box::new(UDFPluginManager::default()),
+ PluginEnum::UDF => Box::<UDFPluginManager>::default(),
}
}
}
diff --git a/ballista/executor/src/executor_server.rs b/ballista/executor/src/executor_server.rs
index da7fb74f..de4696d2 100644
--- a/ballista/executor/src/executor_server.rs
+++ b/ballista/executor/src/executor_server.rs
@@ -342,12 +342,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
let partition_id = task.partition_id;
let shuffle_writer_plan =
self.executor
- .new_shuffle_writer(job_id.clone(), stage_id as usize, plan)?;
+ .new_shuffle_writer(job_id.clone(), stage_id, plan)?;
let part = PartitionId {
job_id: job_id.clone(),
- stage_id: stage_id as usize,
- partition_id: partition_id as usize,
+ stage_id,
+ partition_id,
};
info!("Start to execute shuffle write for task {}", task_identity);
@@ -355,7 +355,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
let execution_result = self
.executor
.execute_shuffle_write(
- task_id as usize,
+ task_id,
part.clone(),
shuffle_writer_plan.clone(),
task_context,
diff --git a/ballista/scheduler/src/state/execution_graph.rs b/ballista/scheduler/src/state/execution_graph.rs
index 6f9b6545..976ec5f5 100644
--- a/ballista/scheduler/src/state/execution_graph.rs
+++ b/ballista/scheduler/src/state/execution_graph.rs
@@ -296,7 +296,6 @@ impl ExecutionGraph {
let mut locations = vec![];
for task_status in stage_task_statuses.into_iter() {
{
- let stage_id = stage_id as usize;
let task_stage_attempt_num =
task_status.stage_attempt_num as usize;
if task_stage_attempt_num < running_stage.stage_attempt_num {
@@ -481,7 +480,6 @@ impl ExecutionGraph {
);
} else if let ExecutionStage::UnResolved(unsolved_stage) = stage {
for task_status in stage_task_statuses.into_iter() {
- let stage_id = stage_id as usize;
let task_stage_attempt_num =
task_status.stage_attempt_num as usize;
let partition_id = task_status.clone().partition_id as usize;
@@ -815,8 +813,8 @@ impl ExecutionGraph {
/// Total number of tasks in this plan that are ready for scheduling
pub fn available_tasks(&self) -> usize {
self.stages
- .iter()
- .map(|(_, stage)| {
+ .values()
+ .map(|stage| {
if let ExecutionStage::Running(stage) = stage {
stage.available_tasks()
} else {
@@ -1412,8 +1410,8 @@ impl Debug for ExecutionGraph {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let stages = self
.stages
- .iter()
- .map(|(_, stage)| format!("{:?}", stage))
+ .values()
+ .map(|stage| format!("{:?}", stage))
.collect::<Vec<String>>()
.join("");
write!(f, "ExecutionGraph[job_id={}, session_id={}, available_tasks={}, is_successful={}]\n{}",
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 10ff91ad..aabfea6f 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -330,7 +330,7 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordB
result = execute_query(&ctx, &plan, opt.debug).await?;
}
let elapsed = start.elapsed().as_secs_f64() * 1000.0;
- millis.push(elapsed as f64);
+ millis.push(elapsed);
let row_count = result.iter().map(|b| b.num_rows()).sum();
println!(
"Query {} iteration {} took {:.1} ms and returned {} rows",
@@ -405,7 +405,7 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> {
.unwrap();
}
let elapsed = start.elapsed().as_secs_f64() * 1000.0;
- millis.push(elapsed as f64);
+ millis.push(elapsed);
let row_count = batches.iter().map(|b| b.num_rows()).sum();
println!(
"Query {} iteration {} took {:.1} ms and returned {} rows",
@@ -556,7 +556,7 @@ fn get_query_sql_by_path(query: usize, mut sql_path: String) -> Result<String> {
}
if query > 0 && query < 23 {
let filename = format!("{}/q{}.sql", sql_path, query);
- Ok(fs::read_to_string(&filename).expect("failed to read query"))
+ Ok(fs::read_to_string(filename).expect("failed to read query"))
} else {
Err(DataFusionError::Plan(
"invalid query. Expected value between 1 and 22".to_owned(),