You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/10/22 13:04:46 UTC

[arrow-ballista] branch master updated: Return multiple tasks in poll_work based on free slots (#429)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new 988d3235 Return multiple tasks in poll_work based on free slots (#429)
988d3235 is described below

commit 988d323533f1e194ef04d263fed8af4464e060a2
Author: Daniƫl Heres <da...@gmail.com>
AuthorDate: Sat Oct 22 15:04:42 2022 +0200

    Return multiple tasks in poll_work based on free slots (#429)
---
 ballista/core/proto/ballista.proto              |  4 +-
 ballista/executor/src/execution_loop.rs         | 18 ++++-----
 ballista/scheduler/src/scheduler_server/grpc.rs | 54 +++++++++++--------------
 3 files changed, 34 insertions(+), 42 deletions(-)

diff --git a/ballista/core/proto/ballista.proto b/ballista/core/proto/ballista.proto
index d473053b..b5992f03 100644
--- a/ballista/core/proto/ballista.proto
+++ b/ballista/core/proto/ballista.proto
@@ -781,7 +781,7 @@ message TaskStatus {
 
 message PollWorkParams {
   ExecutorRegistration metadata = 1;
-  bool can_accept_task = 2;
+  uint32 num_free_slots = 2;
   // All tasks must be reported until they reach the failed or completed state
   repeated TaskStatus task_status = 3;
 }
@@ -825,7 +825,7 @@ message JobSessionConfig {
 }
 
 message PollWorkResult {
-  TaskDefinition task = 1;
+  repeated TaskDefinition tasks = 1;
 }
 
 message RegisterExecutorParams {
diff --git a/ballista/executor/src/execution_loop.rs b/ballista/executor/src/execution_loop.rs
index 990a0f89..4efbbe18 100644
--- a/ballista/executor/src/execution_loop.rs
+++ b/ballista/executor/src/execution_loop.rs
@@ -86,16 +86,19 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
         > = scheduler
             .poll_work(PollWorkParams {
                 metadata: Some(executor.metadata.clone()),
-                can_accept_task: true,
+                num_free_slots: available_task_slots.available_permits() as u32,
                 task_status,
             })
             .await;
 
-        let task_status_sender = task_status_sender.clone();
-
         match poll_work_result {
             Ok(result) => {
-                if let Some(task) = result.into_inner().task {
+                let tasks = result.into_inner().tasks;
+                active_job = !tasks.is_empty();
+
+                for task in tasks {
+                    let task_status_sender = task_status_sender.clone();
+
                     // Acquire a permit/slot for the task
                     let permit =
                         available_task_slots.clone().acquire_owned().await.unwrap();
@@ -110,16 +113,11 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
                     )
                     .await
                     {
-                        Ok(_) => {
-                            active_job = true;
-                        }
+                        Ok(_) => {}
                         Err(e) => {
                             warn!("Failed to run task: {:?}", e);
-                            active_job = false;
                         }
                     }
-                } else {
-                    active_job = false
                 }
             }
             Err(error) => {
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs b/ballista/scheduler/src/scheduler_server/grpc.rs
index e77e774b..14835c90 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -67,7 +67,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
         let remote_addr = request.remote_addr();
         if let PollWorkParams {
             metadata: Some(metadata),
-            can_accept_task,
+            num_free_slots,
             task_status,
         } = request.into_inner()
         {
@@ -140,35 +140,29 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
                     Status::internal(msg)
                 })?;
 
-            // If executor can accept another task, try and find one.
-            let next_task = if can_accept_task {
-                let reservations =
-                    vec![ExecutorReservation::new_free(metadata.id.clone())];
-                if let Ok((mut assignments, _, _)) = self
-                    .state
-                    .task_manager
-                    .fill_reservations(&reservations)
-                    .await
-                {
-                    if let Some((_, task)) = assignments.pop() {
-                        match self.state.task_manager.prepare_task_definition(task) {
-                            Ok(task_definition) => Some(task_definition),
-                            Err(e) => {
-                                error!("Error preparing task definition: {:?}", e);
-                                None
-                            }
+            // Find `num_free_slots` next tasks when available
+            let mut next_tasks = vec![];
+            let reservations = vec![
+                ExecutorReservation::new_free(metadata.id.clone());
+                num_free_slots as usize
+            ];
+            if let Ok((mut assignments, _, _)) = self
+                .state
+                .task_manager
+                .fill_reservations(&reservations)
+                .await
+            {
+                while let Some((_, task)) = assignments.pop() {
+                    match self.state.task_manager.prepare_task_definition(task) {
+                        Ok(task_definition) => next_tasks.push(task_definition),
+                        Err(e) => {
+                            error!("Error preparing task definition: {:?}", e);
                         }
-                    } else {
-                        None
                     }
-                } else {
-                    None
                 }
-            } else {
-                None
-            };
+            }
 
-            Ok(Response::new(PollWorkResult { task: next_task }))
+            Ok(Response::new(PollWorkResult { tasks: next_tasks }))
         } else {
             warn!("Received invalid executor poll_work request");
             Err(Status::invalid_argument("Missing metadata in request"))
@@ -594,7 +588,7 @@ mod test {
         };
         let request: Request<PollWorkParams> = Request::new(PollWorkParams {
             metadata: Some(exec_meta.clone()),
-            can_accept_task: false,
+            num_free_slots: 0,
             task_status: vec![],
         });
         let response = scheduler
@@ -603,7 +597,7 @@ mod test {
             .expect("Received error response")
             .into_inner();
         // no response task since we told the scheduler we didn't want to accept one
-        assert!(response.task.is_none());
+        assert!(response.tasks.is_empty());
         let state: SchedulerState<LogicalPlanNode, PhysicalPlanNode> =
             SchedulerState::new_with_default_scheduler_name(
                 state_storage.clone(),
@@ -626,7 +620,7 @@ mod test {
 
         let request: Request<PollWorkParams> = Request::new(PollWorkParams {
             metadata: Some(exec_meta.clone()),
-            can_accept_task: true,
+            num_free_slots: 1,
             task_status: vec![],
         });
         let response = scheduler
@@ -636,7 +630,7 @@ mod test {
             .into_inner();
 
         // still no response task since there are no tasks in the scheduler
-        assert!(response.task.is_none());
+        assert!(response.tasks.is_empty());
         let state: SchedulerState<LogicalPlanNode, PhysicalPlanNode> =
             SchedulerState::new_with_default_scheduler_name(
                 state_storage.clone(),