You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/08/17 01:10:53 UTC
[arrow-ballista] branch master updated: Spawn a thread for execution plan generation (#135)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new e8bc8bb0 Spawn a thread for execution plan generation (#135)
e8bc8bb0 is described below
commit e8bc8bb07a12ed98fc5ddc193378d551a62b463f
Author: yahoNanJing <90...@users.noreply.github.com>
AuthorDate: Wed Aug 17 09:10:48 2022 +0800
Spawn a thread for execution plan generation (#135)
* Remove session id in QueryStageSchedulerEvent::JobQueued
* Refine on_receive interface of EventAction
* Spawn a thread for execution plan generation
Co-authored-by: yangzhong <ya...@ebay.com>
---
ballista/rust/core/src/event_loop.rs | 23 +++---
ballista/rust/scheduler/src/flight_sql.rs | 1 -
.../rust/scheduler/src/scheduler_server/event.rs | 1 -
.../scheduler/src/scheduler_server/event_loop.rs | 13 +++-
.../rust/scheduler/src/scheduler_server/grpc.rs | 1 -
.../rust/scheduler/src/scheduler_server/mod.rs | 6 --
.../src/scheduler_server/query_stage_scheduler.rs | 90 ++++++++++++----------
ballista/rust/scheduler/src/test_utils.rs | 8 +-
8 files changed, 76 insertions(+), 67 deletions(-)
diff --git a/ballista/rust/core/src/event_loop.rs b/ballista/rust/core/src/event_loop.rs
index 217b7fc3..595bd33f 100644
--- a/ballista/rust/core/src/event_loop.rs
+++ b/ballista/rust/core/src/event_loop.rs
@@ -30,7 +30,12 @@ pub trait EventAction<E>: Send + Sync {
fn on_stop(&self);
- async fn on_receive(&self, event: E) -> Result<Option<E>>;
+ async fn on_receive(
+ &self,
+ event: E,
+ tx_event: &mpsc::Sender<E>,
+ rx_event: &mpsc::Receiver<E>,
+ ) -> Result<()>;
fn on_error(&self, error: BallistaError);
}
@@ -72,19 +77,9 @@ impl<E: Send + 'static> EventLoop<E> {
info!("Starting the event loop {}", name);
while !stopped.load(Ordering::SeqCst) {
if let Some(event) = rx_event.recv().await {
- match action.on_receive(event).await {
- Ok(Some(event)) => {
- if let Err(e) = tx_event.send(event).await {
- let msg = format!("Fail to send event due to {}", e);
- error!("{}", msg);
- action.on_error(BallistaError::General(msg));
- }
- }
- Err(e) => {
- error!("Fail to process event due to {}", e);
- action.on_error(e);
- }
- _ => {}
+ if let Err(e) = action.on_receive(event, &tx_event, &rx_event).await {
+ error!("Fail to process event due to {}", e);
+ action.on_error(e);
}
} else {
info!("Event Channel closed, shutting down");
diff --git a/ballista/rust/scheduler/src/flight_sql.rs b/ballista/rust/scheduler/src/flight_sql.rs
index 1a7275b0..08486fc2 100644
--- a/ballista/rust/scheduler/src/flight_sql.rs
+++ b/ballista/rust/scheduler/src/flight_sql.rs
@@ -256,7 +256,6 @@ impl FlightSqlServiceImpl {
query_stage_event_sender
.post_event(QueryStageSchedulerEvent::JobQueued {
job_id: job_id.clone(),
- session_id: ctx.session_id().clone(),
session_ctx: ctx,
plan: Box::new(plan.clone()),
})
diff --git a/ballista/rust/scheduler/src/scheduler_server/event.rs b/ballista/rust/scheduler/src/scheduler_server/event.rs
index 458fb875..bcfa3e3a 100644
--- a/ballista/rust/scheduler/src/scheduler_server/event.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/event.rs
@@ -32,7 +32,6 @@ pub enum SchedulerServerEvent {
pub enum QueryStageSchedulerEvent {
JobQueued {
job_id: String,
- session_id: String,
session_ctx: Arc<SessionContext>,
plan: Box<LogicalPlan>,
},
diff --git a/ballista/rust/scheduler/src/scheduler_server/event_loop.rs b/ballista/rust/scheduler/src/scheduler_server/event_loop.rs
index d6397ba6..b8e8d9fc 100644
--- a/ballista/rust/scheduler/src/scheduler_server/event_loop.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/event_loop.rs
@@ -26,6 +26,7 @@ use ballista_core::event_loop::EventAction;
use ballista_core::serde::AsExecutionPlan;
use datafusion_proto::logical_plan::AsLogicalPlan;
+use tokio::sync::mpsc;
use crate::state::executor_manager::ExecutorReservation;
use crate::state::SchedulerState;
@@ -137,12 +138,20 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
async fn on_receive(
&self,
event: SchedulerServerEvent,
- ) -> Result<Option<SchedulerServerEvent>> {
+ tx_event: &mpsc::Sender<SchedulerServerEvent>,
+ _rx_event: &mpsc::Receiver<SchedulerServerEvent>,
+ ) -> Result<()> {
match event {
SchedulerServerEvent::Offer(reservations) => {
- self.offer_reservation(reservations).await
+ if let Some(event) = self.offer_reservation(reservations).await? {
+ tx_event.send(event).await.map_err(|e| {
+ BallistaError::General(format!("Fail to send event due to {}", e))
+ })?;
+ }
}
}
+
+ Ok(())
}
fn on_error(&self, error: BallistaError) {
diff --git a/ballista/rust/scheduler/src/scheduler_server/grpc.rs b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
index 9ef898ac..1ec73c04 100644
--- a/ballista/rust/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
@@ -424,7 +424,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
query_stage_event_sender
.post_event(QueryStageSchedulerEvent::JobQueued {
job_id: job_id.clone(),
- session_id: session_id.clone(),
session_ctx,
plan: Box::new(plan),
})
diff --git a/ballista/rust/scheduler/src/scheduler_server/mod.rs b/ballista/rust/scheduler/src/scheduler_server/mod.rs
index 2fac229b..e84f595a 100644
--- a/ballista/rust/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/mod.rs
@@ -431,7 +431,6 @@ mod test {
.await?;
let job_id = "job";
- let session_id = ctx.session_id();
// Send JobQueued event to kick off the event loop
scheduler
@@ -439,7 +438,6 @@ mod test {
.get_sender()?
.post_event(QueryStageSchedulerEvent::JobQueued {
job_id: job_id.to_owned(),
- session_id,
session_ctx: ctx,
plan: Box::new(plan),
})
@@ -572,7 +570,6 @@ mod test {
.await?;
let job_id = "job";
- let session_id = ctx.session_id();
// Send JobQueued event to kick off the event loop
scheduler
@@ -580,7 +577,6 @@ mod test {
.get_sender()?
.post_event(QueryStageSchedulerEvent::JobQueued {
job_id: job_id.to_owned(),
- session_id,
session_ctx: ctx,
plan: Box::new(plan),
})
@@ -707,7 +703,6 @@ mod test {
let plan = ctx.sql("SELECT * FROM explode").await?.to_logical_plan()?;
let job_id = "job";
- let session_id = ctx.session_id();
// Send JobQueued event to kick off the event loop
// This should fail when we try and create the physical plan
@@ -716,7 +711,6 @@ mod test {
.get_sender()?
.post_event(QueryStageSchedulerEvent::JobQueued {
job_id: job_id.to_owned(),
- session_id,
session_ctx: ctx,
plan: Box::new(plan),
})
diff --git a/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs b/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
index 8596aa60..8ae059f8 100644
--- a/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -28,6 +28,7 @@ use ballista_core::event_loop::{EventAction, EventSender};
use ballista_core::serde::AsExecutionPlan;
use datafusion_proto::logical_plan::AsLogicalPlan;
+use tokio::sync::mpsc;
use crate::scheduler_server::event::{QueryStageSchedulerEvent, SchedulerServerEvent};
@@ -52,32 +53,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> QueryStageSchedul
event_sender,
}
}
-
- async fn submit_job(
- &self,
- job_id: String,
- session_id: String,
- session_ctx: Arc<SessionContext>,
- plan: &LogicalPlan,
- ) -> Result<()> {
- let start = Instant::now();
- let optimized_plan = session_ctx.optimize(plan)?;
-
- debug!("Calculated optimized plan: {:?}", optimized_plan);
-
- let plan = session_ctx.create_physical_plan(&optimized_plan).await?;
-
- self.state
- .task_manager
- .submit_job(&job_id, &session_id, plan.clone())
- .await?;
-
- let elapsed = start.elapsed();
-
- info!("Planned job {} in {:?}", job_id, elapsed);
-
- Ok(())
- }
}
#[async_trait]
@@ -95,25 +70,37 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
async fn on_receive(
&self,
event: QueryStageSchedulerEvent,
- ) -> Result<Option<QueryStageSchedulerEvent>> {
+ tx_event: &mpsc::Sender<QueryStageSchedulerEvent>,
+ _rx_event: &mpsc::Receiver<QueryStageSchedulerEvent>,
+ ) -> Result<()> {
match event {
QueryStageSchedulerEvent::JobQueued {
job_id,
- session_id,
session_ctx,
plan,
} => {
info!("Job {} queued", job_id);
- return if let Err(e) = self
- .submit_job(job_id.clone(), session_id, session_ctx, &plan)
- .await
- {
- let msg = format!("Error planning job {}: {:?}", job_id, e);
- error!("{}", msg);
- Ok(Some(QueryStageSchedulerEvent::JobFailed(job_id, msg)))
- } else {
- Ok(Some(QueryStageSchedulerEvent::JobSubmitted(job_id)))
- };
+ let state = self.state.clone();
+ let tx_event = tx_event.clone();
+ tokio::spawn(async move {
+ let event = if let Err(e) =
+ submit_job(state.clone(), job_id.clone(), session_ctx, &plan)
+ .await
+ {
+ let msg = format!("Error planning job {}: {:?}", job_id, e);
+ error!("{}", msg);
+ QueryStageSchedulerEvent::JobFailed(job_id, msg)
+ } else {
+ QueryStageSchedulerEvent::JobSubmitted(job_id)
+ };
+ tx_event
+ .send(event)
+ .await
+ .map_err(|e| {
+ error!("Fail to send event due to {}", e);
+ })
+ .unwrap();
+ });
}
QueryStageSchedulerEvent::JobSubmitted(job_id) => {
info!("Job {} submitted", job_id);
@@ -164,10 +151,35 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
}
}
- Ok(None)
+ Ok(())
}
fn on_error(&self, error: BallistaError) {
error!("Error received by QueryStageScheduler: {:?}", error);
}
}
+
+async fn submit_job<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
+ state: Arc<SchedulerState<T, U>>,
+ job_id: String,
+ session_ctx: Arc<SessionContext>,
+ plan: &LogicalPlan,
+) -> Result<()> {
+ let start = Instant::now();
+ let optimized_plan = session_ctx.optimize(plan)?;
+
+ debug!("Calculated optimized plan: {:?}", optimized_plan);
+
+ let plan = session_ctx.create_physical_plan(&optimized_plan).await?;
+
+ state
+ .task_manager
+ .submit_job(&job_id, &session_ctx.session_id(), plan.clone())
+ .await?;
+
+ let elapsed = start.elapsed();
+
+ info!("Planned job {} in {:?}", job_id, elapsed);
+
+ Ok(())
+}
diff --git a/ballista/rust/scheduler/src/test_utils.rs b/ballista/rust/scheduler/src/test_utils.rs
index 6cbd36ff..c60e8ff3 100644
--- a/ballista/rust/scheduler/src/test_utils.rs
+++ b/ballista/rust/scheduler/src/test_utils.rs
@@ -33,7 +33,7 @@ use datafusion::execution::context::{SessionConfig, SessionContext, SessionState
use datafusion::logical_expr::Expr;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::CsvReadOptions;
-use tokio::sync::mpsc::Sender;
+use tokio::sync::mpsc::{Receiver, Sender};
pub const TPCH_TABLES: &[&str] = &[
"part", "supplier", "partsupp", "customer", "orders", "lineitem", "nation", "region",
@@ -63,10 +63,12 @@ impl EventAction<SchedulerServerEvent> for SchedulerEventObserver {
async fn on_receive(
&self,
event: SchedulerServerEvent,
- ) -> Result<Option<SchedulerServerEvent>> {
+ _tx_event: &Sender<SchedulerServerEvent>,
+ _rx_event: &Receiver<SchedulerServerEvent>,
+ ) -> Result<()> {
self.sender.send(event).await.unwrap();
- Ok(None)
+ Ok(())
}
fn on_error(&self, error: BallistaError) {