You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2022/10/17 19:27:18 UTC
[arrow-ballista] branch master updated: Avoid calling scheduler when the executor cannot accept new tasks (#378)
This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new f2d6dfe7 Avoid calling scheduler when the executor cannot accept new tasks (#378)
f2d6dfe7 is described below
commit f2d6dfe7888f75a52e3237a7a4958781339d797f
Author: Daniƫl Heres <da...@gmail.com>
AuthorDate: Mon Oct 17 21:27:13 2022 +0200
Avoid calling scheduler when the executor cannot accept new tasks (#378)
* Check whether we can accept tasks before polling for work
* Fix order
* Fmt
* Simplify loop
---
ballista/executor/src/execution_loop.rs | 26 ++++++++++++++------------
1 file changed, 14 insertions(+), 12 deletions(-)
diff --git a/ballista/executor/src/execution_loop.rs b/ballista/executor/src/execution_loop.rs
index 3afe9b91..37c3ef3a 100644
--- a/ballista/executor/src/execution_loop.rs
+++ b/ballista/executor/src/execution_loop.rs
@@ -63,20 +63,24 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
info!("Starting poll work loop with scheduler");
loop {
+ let can_accept_task = available_tasks_slots.load(Ordering::SeqCst) > 0;
+
+ // Don't poll for work if we can not accept any tasks
+ if !can_accept_task {
+ tokio::time::sleep(Duration::from_millis(1)).await;
+ continue;
+ }
+
let task_status: Vec<TaskStatus> =
sample_tasks_status(&mut task_status_receiver).await;
- // Keeps track of whether we received task in last iteration
- // to avoid going in sleep mode between polling
- let mut active_job = false;
-
let poll_work_result: anyhow::Result<
tonic::Response<PollWorkResult>,
tonic::Status,
> = scheduler
.poll_work(PollWorkParams {
metadata: Some(executor.metadata.clone()),
- can_accept_task: available_tasks_slots.load(Ordering::SeqCst) > 0,
+ can_accept_task,
task_status,
})
.await;
@@ -95,23 +99,21 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
)
.await
{
- Ok(_) => {
- active_job = true;
- }
+ Ok(_) => {}
Err(e) => {
warn!("Failed to run task: {:?}", e);
- active_job = false;
}
}
- } else {
- active_job = false;
}
}
Err(error) => {
warn!("Executor poll work loop failed. If this continues to happen the Scheduler might be marked as dead. Error: {}", error);
}
}
- if !active_job {
+
+ if available_tasks_slots.load(Ordering::SeqCst)
+ == executor_specification.task_slots as usize
+ {
tokio::time::sleep(Duration::from_millis(100)).await;
}
}