You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2022/10/17 19:27:18 UTC

[arrow-ballista] branch master updated: Avoid calling scheduler when the executor cannot accept new tasks (#378)

This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new f2d6dfe7 Avoid calling scheduler when the executor cannot accept new tasks (#378)
f2d6dfe7 is described below

commit f2d6dfe7888f75a52e3237a7a4958781339d797f
Author: Daniƫl Heres <da...@gmail.com>
AuthorDate: Mon Oct 17 21:27:13 2022 +0200

    Avoid calling scheduler when the executor cannot accept new tasks (#378)
    
    * Check whether we can accept tasks before polling for work
    
    * Fix order
    
    * Fmt
    
    * Simplify loop
---
 ballista/executor/src/execution_loop.rs | 26 ++++++++++++++------------
 1 file changed, 14 insertions(+), 12 deletions(-)

diff --git a/ballista/executor/src/execution_loop.rs b/ballista/executor/src/execution_loop.rs
index 3afe9b91..37c3ef3a 100644
--- a/ballista/executor/src/execution_loop.rs
+++ b/ballista/executor/src/execution_loop.rs
@@ -63,20 +63,24 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
     info!("Starting poll work loop with scheduler");
 
     loop {
+        let can_accept_task = available_tasks_slots.load(Ordering::SeqCst) > 0;
+
+        // Don't poll for work if we can not accept any tasks
+        if !can_accept_task {
+            tokio::time::sleep(Duration::from_millis(1)).await;
+            continue;
+        }
+
         let task_status: Vec<TaskStatus> =
             sample_tasks_status(&mut task_status_receiver).await;
 
-        // Keeps track of whether we received task in last iteration
-        // to avoid going in sleep mode between polling
-        let mut active_job = false;
-
         let poll_work_result: anyhow::Result<
             tonic::Response<PollWorkResult>,
             tonic::Status,
         > = scheduler
             .poll_work(PollWorkParams {
                 metadata: Some(executor.metadata.clone()),
-                can_accept_task: available_tasks_slots.load(Ordering::SeqCst) > 0,
+                can_accept_task,
                 task_status,
             })
             .await;
@@ -95,23 +99,21 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
                     )
                     .await
                     {
-                        Ok(_) => {
-                            active_job = true;
-                        }
+                        Ok(_) => {}
                         Err(e) => {
                             warn!("Failed to run task: {:?}", e);
-                            active_job = false;
                         }
                     }
-                } else {
-                    active_job = false;
                 }
             }
             Err(error) => {
                 warn!("Executor poll work loop failed. If this continues to happen the Scheduler might be marked as dead. Error: {}", error);
             }
         }
-        if !active_job {
+
+        if available_tasks_slots.load(Ordering::SeqCst)
+            == executor_specification.task_slots as usize
+        {
             tokio::time::sleep(Duration::from_millis(100)).await;
         }
     }