You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/10/22 13:04:46 UTC
[arrow-ballista] branch master updated: Return multiple tasks in poll_work based on free slots (#429)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new 988d3235 Return multiple tasks in poll_work based on free slots (#429)
988d3235 is described below
commit 988d323533f1e194ef04d263fed8af4464e060a2
Author: Daniƫl Heres <da...@gmail.com>
AuthorDate: Sat Oct 22 15:04:42 2022 +0200
Return multiple tasks in poll_work based on free slots (#429)
---
ballista/core/proto/ballista.proto | 4 +-
ballista/executor/src/execution_loop.rs | 18 ++++-----
ballista/scheduler/src/scheduler_server/grpc.rs | 54 +++++++++++--------------
3 files changed, 34 insertions(+), 42 deletions(-)
diff --git a/ballista/core/proto/ballista.proto b/ballista/core/proto/ballista.proto
index d473053b..b5992f03 100644
--- a/ballista/core/proto/ballista.proto
+++ b/ballista/core/proto/ballista.proto
@@ -781,7 +781,7 @@ message TaskStatus {
message PollWorkParams {
ExecutorRegistration metadata = 1;
- bool can_accept_task = 2;
+ uint32 num_free_slots = 2;
// All tasks must be reported until they reach the failed or completed state
repeated TaskStatus task_status = 3;
}
@@ -825,7 +825,7 @@ message JobSessionConfig {
}
message PollWorkResult {
- TaskDefinition task = 1;
+ repeated TaskDefinition tasks = 1;
}
message RegisterExecutorParams {
diff --git a/ballista/executor/src/execution_loop.rs b/ballista/executor/src/execution_loop.rs
index 990a0f89..4efbbe18 100644
--- a/ballista/executor/src/execution_loop.rs
+++ b/ballista/executor/src/execution_loop.rs
@@ -86,16 +86,19 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
> = scheduler
.poll_work(PollWorkParams {
metadata: Some(executor.metadata.clone()),
- can_accept_task: true,
+ num_free_slots: available_task_slots.available_permits() as u32,
task_status,
})
.await;
- let task_status_sender = task_status_sender.clone();
-
match poll_work_result {
Ok(result) => {
- if let Some(task) = result.into_inner().task {
+ let tasks = result.into_inner().tasks;
+ active_job = !tasks.is_empty();
+
+ for task in tasks {
+ let task_status_sender = task_status_sender.clone();
+
// Acquire a permit/slot for the task
let permit =
available_task_slots.clone().acquire_owned().await.unwrap();
@@ -110,16 +113,11 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
)
.await
{
- Ok(_) => {
- active_job = true;
- }
+ Ok(_) => {}
Err(e) => {
warn!("Failed to run task: {:?}", e);
- active_job = false;
}
}
- } else {
- active_job = false
}
}
Err(error) => {
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs b/ballista/scheduler/src/scheduler_server/grpc.rs
index e77e774b..14835c90 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -67,7 +67,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
let remote_addr = request.remote_addr();
if let PollWorkParams {
metadata: Some(metadata),
- can_accept_task,
+ num_free_slots,
task_status,
} = request.into_inner()
{
@@ -140,35 +140,29 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
Status::internal(msg)
})?;
- // If executor can accept another task, try and find one.
- let next_task = if can_accept_task {
- let reservations =
- vec![ExecutorReservation::new_free(metadata.id.clone())];
- if let Ok((mut assignments, _, _)) = self
- .state
- .task_manager
- .fill_reservations(&reservations)
- .await
- {
- if let Some((_, task)) = assignments.pop() {
- match self.state.task_manager.prepare_task_definition(task) {
- Ok(task_definition) => Some(task_definition),
- Err(e) => {
- error!("Error preparing task definition: {:?}", e);
- None
- }
+ // Find `num_free_slots` next tasks when available
+ let mut next_tasks = vec![];
+ let reservations = vec![
+ ExecutorReservation::new_free(metadata.id.clone());
+ num_free_slots as usize
+ ];
+ if let Ok((mut assignments, _, _)) = self
+ .state
+ .task_manager
+ .fill_reservations(&reservations)
+ .await
+ {
+ while let Some((_, task)) = assignments.pop() {
+ match self.state.task_manager.prepare_task_definition(task) {
+ Ok(task_definition) => next_tasks.push(task_definition),
+ Err(e) => {
+ error!("Error preparing task definition: {:?}", e);
}
- } else {
- None
}
- } else {
- None
}
- } else {
- None
- };
+ }
- Ok(Response::new(PollWorkResult { task: next_task }))
+ Ok(Response::new(PollWorkResult { tasks: next_tasks }))
} else {
warn!("Received invalid executor poll_work request");
Err(Status::invalid_argument("Missing metadata in request"))
@@ -594,7 +588,7 @@ mod test {
};
let request: Request<PollWorkParams> = Request::new(PollWorkParams {
metadata: Some(exec_meta.clone()),
- can_accept_task: false,
+ num_free_slots: 0,
task_status: vec![],
});
let response = scheduler
@@ -603,7 +597,7 @@ mod test {
.expect("Received error response")
.into_inner();
// no response task since we told the scheduler we didn't want to accept one
- assert!(response.task.is_none());
+ assert!(response.tasks.is_empty());
let state: SchedulerState<LogicalPlanNode, PhysicalPlanNode> =
SchedulerState::new_with_default_scheduler_name(
state_storage.clone(),
@@ -626,7 +620,7 @@ mod test {
let request: Request<PollWorkParams> = Request::new(PollWorkParams {
metadata: Some(exec_meta.clone()),
- can_accept_task: true,
+ num_free_slots: 1,
task_status: vec![],
});
let response = scheduler
@@ -636,7 +630,7 @@ mod test {
.into_inner();
// still no response task since there are no tasks in the scheduler
- assert!(response.task.is_none());
+ assert!(response.tasks.is_empty());
let state: SchedulerState<LogicalPlanNode, PhysicalPlanNode> =
SchedulerState::new_with_default_scheduler_name(
state_storage.clone(),