You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2022/10/20 10:54:16 UTC
[arrow-ballista] branch master updated: Pull-based execution loop improvements (#380)
This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new f58d7191 Pull-based execution loop improvements (#380)
f58d7191 is described below
commit f58d7191696bb78b6d687b68f01c07d6030d1816
Author: Daniƫl Heres <da...@gmail.com>
AuthorDate: Thu Oct 20 12:54:12 2022 +0200
Pull-based execution loop improvements (#380)
* Use dedicated executor in execution loop
* Switch to semaphore
* Add line
* Move before spawn
* Move
* Move after send
* Move acquire in the loop
* Lint, naming
---
ballista/executor/src/execution_loop.rs | 47 ++++++++++++++++++++-------------
1 file changed, 28 insertions(+), 19 deletions(-)
diff --git a/ballista/executor/src/execution_loop.rs b/ballista/executor/src/execution_loop.rs
index e6ad3109..990a0f89 100644
--- a/ballista/executor/src/execution_loop.rs
+++ b/ballista/executor/src/execution_loop.rs
@@ -21,7 +21,9 @@ use ballista_core::serde::protobuf::{
scheduler_grpc_client::SchedulerGrpcClient, PollWorkParams, PollWorkResult,
TaskDefinition, TaskStatus,
};
+use tokio::sync::{OwnedSemaphorePermit, Semaphore};
+use crate::cpu_bound_executor::DedicatedExecutor;
use crate::executor::Executor;
use crate::{as_task_status, TaskExecutionTimes};
use ballista_core::error::BallistaError;
@@ -38,7 +40,6 @@ use std::collections::HashMap;
use std::convert::TryInto;
use std::error::Error;
use std::ops::Deref;
-use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::{Receiver, Sender, TryRecvError};
use std::time::{SystemTime, UNIX_EPOCH};
use std::{sync::Arc, time::Duration};
@@ -56,25 +57,26 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
.unwrap()
.clone()
.into();
- let available_tasks_slots =
- Arc::new(AtomicUsize::new(executor_specification.task_slots as usize));
+ let available_task_slots =
+ Arc::new(Semaphore::new(executor_specification.task_slots as usize));
+
let (task_status_sender, mut task_status_receiver) =
std::sync::mpsc::channel::<TaskStatus>();
info!("Starting poll work loop with scheduler");
+ let dedicated_executor =
+ DedicatedExecutor::new("task_runner", executor_specification.task_slots as usize);
+
loop {
+ // Wait for task slots to be available before asking for new work
+ let permit = available_task_slots.acquire().await.unwrap();
+ // Make the slot available again
+ drop(permit);
+
// Keeps track of whether we received task in last iteration
// to avoid going in sleep mode between polling
let mut active_job = false;
- let can_accept_task = available_tasks_slots.load(Ordering::SeqCst) > 0;
-
- // Don't poll for work if we can not accept any tasks
- if !can_accept_task {
- tokio::time::sleep(Duration::from_millis(1)).await;
- continue;
- }
-
let task_status: Vec<TaskStatus> =
sample_tasks_status(&mut task_status_receiver).await;
@@ -84,7 +86,7 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
> = scheduler
.poll_work(PollWorkParams {
metadata: Some(executor.metadata.clone()),
- can_accept_task,
+ can_accept_task: true,
task_status,
})
.await;
@@ -94,12 +96,17 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
match poll_work_result {
Ok(result) => {
if let Some(task) = result.into_inner().task {
- match run_received_tasks(
+ // Acquire a permit/slot for the task
+ let permit =
+ available_task_slots.clone().acquire_owned().await.unwrap();
+
+ match run_received_task(
executor.clone(),
- available_tasks_slots.clone(),
+ permit,
task_status_sender,
task,
&codec,
+ &dedicated_executor,
)
.await
{
@@ -139,12 +146,13 @@ pub(crate) fn any_to_string(any: &Box<dyn Any + Send>) -> String {
}
}
-async fn run_received_tasks<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
+async fn run_received_task<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
executor: Arc<Executor>,
- available_tasks_slots: Arc<AtomicUsize>,
+ permit: OwnedSemaphorePermit,
task_status_sender: Sender<TaskStatus>,
task: TaskDefinition,
codec: &BallistaCodec<T, U>,
+ dedicated_executor: &DedicatedExecutor,
) -> Result<(), BallistaError> {
let task_id = task.task_id;
let task_attempt_num = task.task_attempt_num;
@@ -162,7 +170,6 @@ async fn run_received_tasks<T: 'static + AsLogicalPlan, U: 'static + AsExecution
task_id, job_id, stage_id, stage_attempt_num, partition_id, task_attempt_num
);
info!("Received task {}", task_identity);
- available_tasks_slots.fetch_sub(1, Ordering::SeqCst);
let mut task_props = HashMap::new();
for kv_pair in task.props {
@@ -206,7 +213,7 @@ async fn run_received_tasks<T: 'static + AsLogicalPlan, U: 'static + AsExecution
let shuffle_writer_plan =
executor.new_shuffle_writer(job_id.clone(), stage_id as usize, plan)?;
- tokio::spawn(async move {
+ dedicated_executor.spawn(async move {
use std::panic::AssertUnwindSafe;
let part = PartitionId {
job_id: job_id.clone(),
@@ -234,7 +241,6 @@ async fn run_received_tasks<T: 'static + AsLogicalPlan, U: 'static + AsExecution
info!("Done with task {}", task_identity);
debug!("Statistics: {:?}", execution_result);
- available_tasks_slots.fetch_add(1, Ordering::SeqCst);
let plan_metrics = collect_plan_metrics(shuffle_writer_plan.as_ref());
let operator_metrics = plan_metrics
@@ -263,6 +269,9 @@ async fn run_received_tasks<T: 'static + AsLogicalPlan, U: 'static + AsExecution
operator_metrics,
task_execution_times,
));
+
+ // Release the permit after the work is done
+ drop(permit);
});
Ok(())