You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2022/10/20 10:54:16 UTC

[arrow-ballista] branch master updated: Pull-based execution loop improvements (#380)

This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new f58d7191 Pull-based execution loop improvements (#380)
f58d7191 is described below

commit f58d7191696bb78b6d687b68f01c07d6030d1816
Author: Daniƫl Heres <da...@gmail.com>
AuthorDate: Thu Oct 20 12:54:12 2022 +0200

    Pull-based execution loop improvements (#380)
    
    * Use dedicated executor in execution loop
    
    * Switch to semaphore
    
    * Add line
    
    * Move before spawn
    
    * Move
    
    * Move after send
    
    * Move acquire in the loop
    
    * Lint, naming
---
 ballista/executor/src/execution_loop.rs | 47 ++++++++++++++++++++-------------
 1 file changed, 28 insertions(+), 19 deletions(-)

diff --git a/ballista/executor/src/execution_loop.rs b/ballista/executor/src/execution_loop.rs
index e6ad3109..990a0f89 100644
--- a/ballista/executor/src/execution_loop.rs
+++ b/ballista/executor/src/execution_loop.rs
@@ -21,7 +21,9 @@ use ballista_core::serde::protobuf::{
     scheduler_grpc_client::SchedulerGrpcClient, PollWorkParams, PollWorkResult,
     TaskDefinition, TaskStatus,
 };
+use tokio::sync::{OwnedSemaphorePermit, Semaphore};
 
+use crate::cpu_bound_executor::DedicatedExecutor;
 use crate::executor::Executor;
 use crate::{as_task_status, TaskExecutionTimes};
 use ballista_core::error::BallistaError;
@@ -38,7 +40,6 @@ use std::collections::HashMap;
 use std::convert::TryInto;
 use std::error::Error;
 use std::ops::Deref;
-use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::mpsc::{Receiver, Sender, TryRecvError};
 use std::time::{SystemTime, UNIX_EPOCH};
 use std::{sync::Arc, time::Duration};
@@ -56,25 +57,26 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
         .unwrap()
         .clone()
         .into();
-    let available_tasks_slots =
-        Arc::new(AtomicUsize::new(executor_specification.task_slots as usize));
+    let available_task_slots =
+        Arc::new(Semaphore::new(executor_specification.task_slots as usize));
+
     let (task_status_sender, mut task_status_receiver) =
         std::sync::mpsc::channel::<TaskStatus>();
     info!("Starting poll work loop with scheduler");
 
+    let dedicated_executor =
+        DedicatedExecutor::new("task_runner", executor_specification.task_slots as usize);
+
     loop {
+        // Wait for task slots to be available before asking for new work
+        let permit = available_task_slots.acquire().await.unwrap();
+        // Make the slot available again
+        drop(permit);
+
         // Keeps track of whether we received task in last iteration
         // to avoid going in sleep mode between polling
         let mut active_job = false;
 
-        let can_accept_task = available_tasks_slots.load(Ordering::SeqCst) > 0;
-
-        // Don't poll for work if we can not accept any tasks
-        if !can_accept_task {
-            tokio::time::sleep(Duration::from_millis(1)).await;
-            continue;
-        }
-
         let task_status: Vec<TaskStatus> =
             sample_tasks_status(&mut task_status_receiver).await;
 
@@ -84,7 +86,7 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
         > = scheduler
             .poll_work(PollWorkParams {
                 metadata: Some(executor.metadata.clone()),
-                can_accept_task,
+                can_accept_task: true,
                 task_status,
             })
             .await;
@@ -94,12 +96,17 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
         match poll_work_result {
             Ok(result) => {
                 if let Some(task) = result.into_inner().task {
-                    match run_received_tasks(
+                    // Acquire a permit/slot for the task
+                    let permit =
+                        available_task_slots.clone().acquire_owned().await.unwrap();
+
+                    match run_received_task(
                         executor.clone(),
-                        available_tasks_slots.clone(),
+                        permit,
                         task_status_sender,
                         task,
                         &codec,
+                        &dedicated_executor,
                     )
                     .await
                     {
@@ -139,12 +146,13 @@ pub(crate) fn any_to_string(any: &Box<dyn Any + Send>) -> String {
     }
 }
 
-async fn run_received_tasks<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
+async fn run_received_task<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
     executor: Arc<Executor>,
-    available_tasks_slots: Arc<AtomicUsize>,
+    permit: OwnedSemaphorePermit,
     task_status_sender: Sender<TaskStatus>,
     task: TaskDefinition,
     codec: &BallistaCodec<T, U>,
+    dedicated_executor: &DedicatedExecutor,
 ) -> Result<(), BallistaError> {
     let task_id = task.task_id;
     let task_attempt_num = task.task_attempt_num;
@@ -162,7 +170,6 @@ async fn run_received_tasks<T: 'static + AsLogicalPlan, U: 'static + AsExecution
         task_id, job_id, stage_id, stage_attempt_num, partition_id, task_attempt_num
     );
     info!("Received task {}", task_identity);
-    available_tasks_slots.fetch_sub(1, Ordering::SeqCst);
 
     let mut task_props = HashMap::new();
     for kv_pair in task.props {
@@ -206,7 +213,7 @@ async fn run_received_tasks<T: 'static + AsLogicalPlan, U: 'static + AsExecution
 
     let shuffle_writer_plan =
         executor.new_shuffle_writer(job_id.clone(), stage_id as usize, plan)?;
-    tokio::spawn(async move {
+    dedicated_executor.spawn(async move {
         use std::panic::AssertUnwindSafe;
         let part = PartitionId {
             job_id: job_id.clone(),
@@ -234,7 +241,6 @@ async fn run_received_tasks<T: 'static + AsLogicalPlan, U: 'static + AsExecution
 
         info!("Done with task {}", task_identity);
         debug!("Statistics: {:?}", execution_result);
-        available_tasks_slots.fetch_add(1, Ordering::SeqCst);
 
         let plan_metrics = collect_plan_metrics(shuffle_writer_plan.as_ref());
         let operator_metrics = plan_metrics
@@ -263,6 +269,9 @@ async fn run_received_tasks<T: 'static + AsLogicalPlan, U: 'static + AsExecution
             operator_metrics,
             task_execution_times,
         ));
+
+        // Release the permit after the work is done
+        drop(permit);
     });
 
     Ok(())