You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/08/17 01:10:53 UTC

[arrow-ballista] branch master updated: Spawn a thread for execution plan generation (#135)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new e8bc8bb0 Spawn a thread for execution plan generation (#135)
e8bc8bb0 is described below

commit e8bc8bb07a12ed98fc5ddc193378d551a62b463f
Author: yahoNanJing <90...@users.noreply.github.com>
AuthorDate: Wed Aug 17 09:10:48 2022 +0800

    Spawn a thread for execution plan generation (#135)
    
    * Remove session id in QueryStageSchedulerEvent::JobQueued
    
    * Refine on_receive interface of EventAction
    
    * Spawn a thread for execution plan generation
    
    Co-authored-by: yangzhong <ya...@ebay.com>
---
 ballista/rust/core/src/event_loop.rs               | 23 +++---
 ballista/rust/scheduler/src/flight_sql.rs          |  1 -
 .../rust/scheduler/src/scheduler_server/event.rs   |  1 -
 .../scheduler/src/scheduler_server/event_loop.rs   | 13 +++-
 .../rust/scheduler/src/scheduler_server/grpc.rs    |  1 -
 .../rust/scheduler/src/scheduler_server/mod.rs     |  6 --
 .../src/scheduler_server/query_stage_scheduler.rs  | 90 ++++++++++++----------
 ballista/rust/scheduler/src/test_utils.rs          |  8 +-
 8 files changed, 76 insertions(+), 67 deletions(-)

diff --git a/ballista/rust/core/src/event_loop.rs b/ballista/rust/core/src/event_loop.rs
index 217b7fc3..595bd33f 100644
--- a/ballista/rust/core/src/event_loop.rs
+++ b/ballista/rust/core/src/event_loop.rs
@@ -30,7 +30,12 @@ pub trait EventAction<E>: Send + Sync {
 
     fn on_stop(&self);
 
-    async fn on_receive(&self, event: E) -> Result<Option<E>>;
+    async fn on_receive(
+        &self,
+        event: E,
+        tx_event: &mpsc::Sender<E>,
+        rx_event: &mpsc::Receiver<E>,
+    ) -> Result<()>;
 
     fn on_error(&self, error: BallistaError);
 }
@@ -72,19 +77,9 @@ impl<E: Send + 'static> EventLoop<E> {
             info!("Starting the event loop {}", name);
             while !stopped.load(Ordering::SeqCst) {
                 if let Some(event) = rx_event.recv().await {
-                    match action.on_receive(event).await {
-                        Ok(Some(event)) => {
-                            if let Err(e) = tx_event.send(event).await {
-                                let msg = format!("Fail to send event due to {}", e);
-                                error!("{}", msg);
-                                action.on_error(BallistaError::General(msg));
-                            }
-                        }
-                        Err(e) => {
-                            error!("Fail to process event due to {}", e);
-                            action.on_error(e);
-                        }
-                        _ => {}
+                    if let Err(e) = action.on_receive(event, &tx_event, &rx_event).await {
+                        error!("Fail to process event due to {}", e);
+                        action.on_error(e);
                     }
                 } else {
                     info!("Event Channel closed, shutting down");
diff --git a/ballista/rust/scheduler/src/flight_sql.rs b/ballista/rust/scheduler/src/flight_sql.rs
index 1a7275b0..08486fc2 100644
--- a/ballista/rust/scheduler/src/flight_sql.rs
+++ b/ballista/rust/scheduler/src/flight_sql.rs
@@ -256,7 +256,6 @@ impl FlightSqlServiceImpl {
         query_stage_event_sender
             .post_event(QueryStageSchedulerEvent::JobQueued {
                 job_id: job_id.clone(),
-                session_id: ctx.session_id().clone(),
                 session_ctx: ctx,
                 plan: Box::new(plan.clone()),
             })
diff --git a/ballista/rust/scheduler/src/scheduler_server/event.rs b/ballista/rust/scheduler/src/scheduler_server/event.rs
index 458fb875..bcfa3e3a 100644
--- a/ballista/rust/scheduler/src/scheduler_server/event.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/event.rs
@@ -32,7 +32,6 @@ pub enum SchedulerServerEvent {
 pub enum QueryStageSchedulerEvent {
     JobQueued {
         job_id: String,
-        session_id: String,
         session_ctx: Arc<SessionContext>,
         plan: Box<LogicalPlan>,
     },
diff --git a/ballista/rust/scheduler/src/scheduler_server/event_loop.rs b/ballista/rust/scheduler/src/scheduler_server/event_loop.rs
index d6397ba6..b8e8d9fc 100644
--- a/ballista/rust/scheduler/src/scheduler_server/event_loop.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/event_loop.rs
@@ -26,6 +26,7 @@ use ballista_core::event_loop::EventAction;
 
 use ballista_core::serde::AsExecutionPlan;
 use datafusion_proto::logical_plan::AsLogicalPlan;
+use tokio::sync::mpsc;
 
 use crate::state::executor_manager::ExecutorReservation;
 use crate::state::SchedulerState;
@@ -137,12 +138,20 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
     async fn on_receive(
         &self,
         event: SchedulerServerEvent,
-    ) -> Result<Option<SchedulerServerEvent>> {
+        tx_event: &mpsc::Sender<SchedulerServerEvent>,
+        _rx_event: &mpsc::Receiver<SchedulerServerEvent>,
+    ) -> Result<()> {
         match event {
             SchedulerServerEvent::Offer(reservations) => {
-                self.offer_reservation(reservations).await
+                if let Some(event) = self.offer_reservation(reservations).await? {
+                    tx_event.send(event).await.map_err(|e| {
+                        BallistaError::General(format!("Fail to send event due to {}", e))
+                    })?;
+                }
             }
         }
+
+        Ok(())
     }
 
     fn on_error(&self, error: BallistaError) {
diff --git a/ballista/rust/scheduler/src/scheduler_server/grpc.rs b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
index 9ef898ac..1ec73c04 100644
--- a/ballista/rust/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
@@ -424,7 +424,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
             query_stage_event_sender
                 .post_event(QueryStageSchedulerEvent::JobQueued {
                     job_id: job_id.clone(),
-                    session_id: session_id.clone(),
                     session_ctx,
                     plan: Box::new(plan),
                 })
diff --git a/ballista/rust/scheduler/src/scheduler_server/mod.rs b/ballista/rust/scheduler/src/scheduler_server/mod.rs
index 2fac229b..e84f595a 100644
--- a/ballista/rust/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/mod.rs
@@ -431,7 +431,6 @@ mod test {
             .await?;
 
         let job_id = "job";
-        let session_id = ctx.session_id();
 
         // Send JobQueued event to kick off the event loop
         scheduler
@@ -439,7 +438,6 @@ mod test {
             .get_sender()?
             .post_event(QueryStageSchedulerEvent::JobQueued {
                 job_id: job_id.to_owned(),
-                session_id,
                 session_ctx: ctx,
                 plan: Box::new(plan),
             })
@@ -572,7 +570,6 @@ mod test {
             .await?;
 
         let job_id = "job";
-        let session_id = ctx.session_id();
 
         // Send JobQueued event to kick off the event loop
         scheduler
@@ -580,7 +577,6 @@ mod test {
             .get_sender()?
             .post_event(QueryStageSchedulerEvent::JobQueued {
                 job_id: job_id.to_owned(),
-                session_id,
                 session_ctx: ctx,
                 plan: Box::new(plan),
             })
@@ -707,7 +703,6 @@ mod test {
         let plan = ctx.sql("SELECT * FROM explode").await?.to_logical_plan()?;
 
         let job_id = "job";
-        let session_id = ctx.session_id();
 
         // Send JobQueued event to kick off the event loop
         // This should fail when we try and create the physical plan
@@ -716,7 +711,6 @@ mod test {
             .get_sender()?
             .post_event(QueryStageSchedulerEvent::JobQueued {
                 job_id: job_id.to_owned(),
-                session_id,
                 session_ctx: ctx,
                 plan: Box::new(plan),
             })
diff --git a/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs b/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
index 8596aa60..8ae059f8 100644
--- a/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -28,6 +28,7 @@ use ballista_core::event_loop::{EventAction, EventSender};
 
 use ballista_core::serde::AsExecutionPlan;
 use datafusion_proto::logical_plan::AsLogicalPlan;
+use tokio::sync::mpsc;
 
 use crate::scheduler_server::event::{QueryStageSchedulerEvent, SchedulerServerEvent};
 
@@ -52,32 +53,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> QueryStageSchedul
             event_sender,
         }
     }
-
-    async fn submit_job(
-        &self,
-        job_id: String,
-        session_id: String,
-        session_ctx: Arc<SessionContext>,
-        plan: &LogicalPlan,
-    ) -> Result<()> {
-        let start = Instant::now();
-        let optimized_plan = session_ctx.optimize(plan)?;
-
-        debug!("Calculated optimized plan: {:?}", optimized_plan);
-
-        let plan = session_ctx.create_physical_plan(&optimized_plan).await?;
-
-        self.state
-            .task_manager
-            .submit_job(&job_id, &session_id, plan.clone())
-            .await?;
-
-        let elapsed = start.elapsed();
-
-        info!("Planned job {} in {:?}", job_id, elapsed);
-
-        Ok(())
-    }
 }
 
 #[async_trait]
@@ -95,25 +70,37 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
     async fn on_receive(
         &self,
         event: QueryStageSchedulerEvent,
-    ) -> Result<Option<QueryStageSchedulerEvent>> {
+        tx_event: &mpsc::Sender<QueryStageSchedulerEvent>,
+        _rx_event: &mpsc::Receiver<QueryStageSchedulerEvent>,
+    ) -> Result<()> {
         match event {
             QueryStageSchedulerEvent::JobQueued {
                 job_id,
-                session_id,
                 session_ctx,
                 plan,
             } => {
                 info!("Job {} queued", job_id);
-                return if let Err(e) = self
-                    .submit_job(job_id.clone(), session_id, session_ctx, &plan)
-                    .await
-                {
-                    let msg = format!("Error planning job {}: {:?}", job_id, e);
-                    error!("{}", msg);
-                    Ok(Some(QueryStageSchedulerEvent::JobFailed(job_id, msg)))
-                } else {
-                    Ok(Some(QueryStageSchedulerEvent::JobSubmitted(job_id)))
-                };
+                let state = self.state.clone();
+                let tx_event = tx_event.clone();
+                tokio::spawn(async move {
+                    let event = if let Err(e) =
+                        submit_job(state.clone(), job_id.clone(), session_ctx, &plan)
+                            .await
+                    {
+                        let msg = format!("Error planning job {}: {:?}", job_id, e);
+                        error!("{}", msg);
+                        QueryStageSchedulerEvent::JobFailed(job_id, msg)
+                    } else {
+                        QueryStageSchedulerEvent::JobSubmitted(job_id)
+                    };
+                    tx_event
+                        .send(event)
+                        .await
+                        .map_err(|e| {
+                            error!("Fail to send event due to {}", e);
+                        })
+                        .unwrap();
+                });
             }
             QueryStageSchedulerEvent::JobSubmitted(job_id) => {
                 info!("Job {} submitted", job_id);
@@ -164,10 +151,35 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
             }
         }
 
-        Ok(None)
+        Ok(())
     }
 
     fn on_error(&self, error: BallistaError) {
         error!("Error received by QueryStageScheduler: {:?}", error);
     }
 }
+
+async fn submit_job<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
+    state: Arc<SchedulerState<T, U>>,
+    job_id: String,
+    session_ctx: Arc<SessionContext>,
+    plan: &LogicalPlan,
+) -> Result<()> {
+    let start = Instant::now();
+    let optimized_plan = session_ctx.optimize(plan)?;
+
+    debug!("Calculated optimized plan: {:?}", optimized_plan);
+
+    let plan = session_ctx.create_physical_plan(&optimized_plan).await?;
+
+    state
+        .task_manager
+        .submit_job(&job_id, &session_ctx.session_id(), plan.clone())
+        .await?;
+
+    let elapsed = start.elapsed();
+
+    info!("Planned job {} in {:?}", job_id, elapsed);
+
+    Ok(())
+}
diff --git a/ballista/rust/scheduler/src/test_utils.rs b/ballista/rust/scheduler/src/test_utils.rs
index 6cbd36ff..c60e8ff3 100644
--- a/ballista/rust/scheduler/src/test_utils.rs
+++ b/ballista/rust/scheduler/src/test_utils.rs
@@ -33,7 +33,7 @@ use datafusion::execution::context::{SessionConfig, SessionContext, SessionState
 use datafusion::logical_expr::Expr;
 use datafusion::physical_plan::ExecutionPlan;
 use datafusion::prelude::CsvReadOptions;
-use tokio::sync::mpsc::Sender;
+use tokio::sync::mpsc::{Receiver, Sender};
 
 pub const TPCH_TABLES: &[&str] = &[
     "part", "supplier", "partsupp", "customer", "orders", "lineitem", "nation", "region",
@@ -63,10 +63,12 @@ impl EventAction<SchedulerServerEvent> for SchedulerEventObserver {
     async fn on_receive(
         &self,
         event: SchedulerServerEvent,
-    ) -> Result<Option<SchedulerServerEvent>> {
+        _tx_event: &Sender<SchedulerServerEvent>,
+        _rx_event: &Receiver<SchedulerServerEvent>,
+    ) -> Result<()> {
         self.sender.send(event).await.unwrap();
 
-        Ok(None)
+        Ok(())
     }
 
     fn on_error(&self, error: BallistaError) {