You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/08/22 04:33:12 UTC
[arrow-ballista] branch master updated: Stop Executor Impl, Executor Graceful Shutdown (#151)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new 3f80b42a Stop Executor Impl, Executor Graceful Shutdown (#151)
3f80b42a is described below
commit 3f80b42aca15380c5dfd053b53753067e57801f8
Author: mingmwang <mi...@ebay.com>
AuthorDate: Mon Aug 22 12:33:08 2022 +0800
Stop Executor Impl, Executor Graceful Shutdown (#151)
---
ballista/rust/core/proto/ballista.proto | 4 +
ballista/rust/executor/src/execution_loop.rs | 9 +-
ballista/rust/executor/src/executor_server.rs | 135 +++++++++++++++----
ballista/rust/executor/src/flight_service.rs | 17 +--
ballista/rust/executor/src/lib.rs | 1 +
ballista/rust/executor/src/main.rs | 187 +++++++++++++++++++++-----
ballista/rust/executor/src/shutdown.rs | 105 +++++++++++++++
ballista/rust/executor/src/standalone.rs | 2 +-
8 files changed, 387 insertions(+), 73 deletions(-)
diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto
index 7472d7ae..0ebc922d 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -681,6 +681,10 @@ message HeartBeatResult {
}
message StopExecutorParams {
+ // stop reason
+ string reason = 1;
+ // force to stop the executor immediately
+ bool force = 2;
}
message StopExecutorResult {
diff --git a/ballista/rust/executor/src/execution_loop.rs b/ballista/rust/executor/src/execution_loop.rs
index 471377e4..f7e029c2 100644
--- a/ballista/rust/executor/src/execution_loop.rs
+++ b/ballista/rust/executor/src/execution_loop.rs
@@ -32,7 +32,7 @@ use ballista_core::utils::collect_plan_metrics;
use datafusion::execution::context::TaskContext;
use datafusion_proto::logical_plan::AsLogicalPlan;
use futures::FutureExt;
-use log::{debug, error, info, trace, warn};
+use log::{debug, error, info, warn};
use std::any::Any;
use std::collections::HashMap;
use std::convert::TryInto;
@@ -47,7 +47,7 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
mut scheduler: SchedulerGrpcClient<Channel>,
executor: Arc<Executor>,
codec: BallistaCodec<T, U>,
-) {
+) -> Result<(), BallistaError> {
let executor_specification: ExecutorSpecification = executor
.metadata
.specification
@@ -59,10 +59,9 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
Arc::new(AtomicUsize::new(executor_specification.task_slots as usize));
let (task_status_sender, mut task_status_receiver) =
std::sync::mpsc::channel::<TaskStatus>();
+ info!("Starting poll work loop with scheduler");
loop {
- trace!("Starting registration loop with scheduler");
-
let task_status: Vec<TaskStatus> =
sample_tasks_status(&mut task_status_receiver).await;
@@ -108,7 +107,7 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
}
}
Err(error) => {
- warn!("Executor registration failed. If this continues to happen the executor might be marked as dead by the scheduler. Error: {}", error);
+ warn!("Executor poll work loop failed. If this continues to happen the Scheduler might be marked as dead. Error: {}", error);
}
}
if !active_job {
diff --git a/ballista/rust/executor/src/executor_server.rs b/ballista/rust/executor/src/executor_server.rs
index 591c88a1..8f0e758f 100644
--- a/ballista/rust/executor/src/executor_server.rs
+++ b/ballista/rust/executor/src/executor_server.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use ballista_core::BALLISTA_VERSION;
use std::collections::HashMap;
use std::convert::TryInto;
use std::ops::Deref;
@@ -45,16 +46,22 @@ use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_proto::logical_plan::AsLogicalPlan;
use tokio::sync::mpsc::error::TryRecvError;
+use tokio::task::JoinHandle;
use crate::as_task_status;
use crate::cpu_bound_executor::DedicatedExecutor;
use crate::executor::Executor;
+use crate::shutdown::ShutdownNotifier;
+
+type ServerHandle = JoinHandle<Result<(), BallistaError>>;
pub async fn startup<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
mut scheduler: SchedulerGrpcClient<Channel>,
executor: Arc<Executor>,
codec: BallistaCodec<T, U>,
-) {
+ stop_send: mpsc::Sender<bool>,
+ shutdown_noti: &ShutdownNotifier,
+) -> Result<ServerHandle, BallistaError> {
// TODO make the buffer size configurable
let (tx_task, rx_task) = mpsc::channel::<TaskDefinition>(1000);
let (tx_task_status, rx_task_status) = mpsc::channel::<TaskStatus>(1000);
@@ -65,12 +72,13 @@ pub async fn startup<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
ExecutorEnv {
tx_task,
tx_task_status,
+ tx_stop: stop_send,
},
codec,
);
// 1. Start executor grpc service
- {
+ let server = {
let executor_meta = executor.metadata.clone();
let addr = format!(
"{}:{}",
@@ -83,36 +91,53 @@ pub async fn startup<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
executor_meta.grpc_port
);
let addr = addr.parse().unwrap();
- info!("Setup executor grpc service for {:?}", addr);
+ info!(
+ "Ballista v{} Rust Executor Grpc Server listening on {:?}",
+ BALLISTA_VERSION, addr
+ );
let server = ExecutorGrpcServer::new(executor_server.clone());
- let grpc_server_future = create_grpc_server().add_service(server).serve(addr);
- tokio::spawn(async move { grpc_server_future.await });
- }
-
- let executor_server = Arc::new(executor_server);
+ let mut grpc_shutdown = shutdown_noti.subscribe_for_shutdown();
+ tokio::spawn(async move {
+ let shutdown_signal = grpc_shutdown.recv();
+ let grpc_server_future = create_grpc_server()
+ .add_service(server)
+ .serve_with_shutdown(addr, shutdown_signal);
+ grpc_server_future.await.map_err(|e| {
+ error!("Tonic error, Could not start Executor Grpc Server.");
+ BallistaError::TonicError(e)
+ })
+ })
+ };
// 2. Do executor registration
+ // TODO the executor registration should happen only after the executor grpc server started.
+ let executor_server = Arc::new(executor_server);
match register_executor(&mut scheduler, executor.clone()).await {
Ok(_) => {
info!("Executor registration succeed");
}
Err(error) => {
- panic!("Executor registration failed due to: {}", error);
+ error!("Executor registration failed due to: {}", error);
+ // abort the Executor Grpc Future
+ server.abort();
+ return Err(error);
}
};
- // 3. Start Heartbeater
+ // 3. Start Heartbeater loop
{
let heartbeater = Heartbeater::new(executor_server.clone());
- heartbeater.start().await;
+ heartbeater.start(shutdown_noti);
}
- // 4. Start TaskRunnerPool
+ // 4. Start TaskRunnerPool loop
{
let task_runner_pool = TaskRunnerPool::new(executor_server.clone());
- task_runner_pool.start(rx_task, rx_task_status).await;
+ task_runner_pool.start(rx_task, rx_task_status, shutdown_noti);
}
+
+ Ok(server)
}
#[allow(clippy::clone_on_copy)]
@@ -149,6 +174,8 @@ struct ExecutorEnv {
tx_task: mpsc::Sender<TaskDefinition>,
/// Receive `TaskStatus` from CPU bound tasks pool `dedicated_executor` then use rpc send back to scheduler.
tx_task_status: mpsc::Sender<TaskStatus>,
+ /// Receive stop executor request from rpc.
+ tx_stop: mpsc::Sender<bool>,
}
unsafe impl Sync for ExecutorEnv {}
@@ -280,6 +307,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
}
}
+/// Heartbeater will run forever until a shutdown notification received.
struct Heartbeater<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> {
executor_server: Arc<ExecutorServer<T, U>>,
}
@@ -289,18 +317,32 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> Heartbeater<T, U>
Self { executor_server }
}
- async fn start(&self) {
+ fn start(&self, shutdown_noti: &ShutdownNotifier) {
let executor_server = self.executor_server.clone();
+ let mut heartbeat_shutdown = shutdown_noti.subscribe_for_shutdown();
+ let heartbeat_complete = shutdown_noti.shutdown_complete_tx.clone();
tokio::spawn(async move {
info!("Starting heartbeater to send heartbeat the scheduler periodically");
- loop {
+ // As long as the shutdown notification has not been received
+ while !heartbeat_shutdown.is_shutdown() {
executor_server.heartbeat().await;
- tokio::time::sleep(Duration::from_millis(60000)).await;
+ tokio::select! {
+ _ = tokio::time::sleep(Duration::from_millis(60000)) => {},
+ _ = heartbeat_shutdown.recv() => {
+ info!("Stop heartbeater");
+ drop(heartbeat_complete);
+ return;
+ }
+ };
}
});
}
}
+/// There are two loop(future) running separately in tokio runtime.
+/// First is for sending back task status to scheduler
+/// Second is for receiving task from scheduler and run.
+/// The two loops will run forever until a shutdown notification received.
struct TaskRunnerPool<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> {
executor_server: Arc<ExecutorServer<T, U>>,
}
@@ -310,25 +352,36 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskRunnerPool<T,
Self { executor_server }
}
- // There are two loop(future) running separately in tokio runtime.
- // First is for sending back task status to scheduler
- // Second is for receiving task from scheduler and run
- async fn start(
+ fn start(
&self,
mut rx_task: mpsc::Receiver<TaskDefinition>,
mut rx_task_status: mpsc::Receiver<TaskStatus>,
+ shutdown_noti: &ShutdownNotifier,
) {
//1. loop for task status reporting
let executor_server = self.executor_server.clone();
+ let mut tasks_status_shutdown = shutdown_noti.subscribe_for_shutdown();
+ let tasks_status_complete = shutdown_noti.shutdown_complete_tx.clone();
tokio::spawn(async move {
info!("Starting the task status reporter");
- loop {
+ // As long as the shutdown notification has not been received
+ while !tasks_status_shutdown.is_shutdown() {
let mut tasks_status = vec![];
// First try to fetch task status from the channel in *blocking* mode
- if let Some(task_status) = rx_task_status.recv().await {
+ let maybe_task_status = tokio::select! {
+ task_status = rx_task_status.recv() => task_status,
+ _ = tasks_status_shutdown.recv() => {
+ info!("Stop task status reporting loop");
+ drop(tasks_status_complete);
+ return;
+ }
+ };
+
+ if let Some(task_status) = maybe_task_status {
tasks_status.push(task_status);
} else {
- info!("Channel is closed and will exit the loop");
+ info!("Channel is closed and will exit the task status report loop.");
+ drop(tasks_status_complete);
return;
}
@@ -346,7 +399,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskRunnerPool<T,
break;
}
Err(TryRecvError::Disconnected) => {
- info!("Channel is closed and will exit the loop");
+ info!("Channel is closed and will exit the task status report loop");
+ drop(tasks_status_complete);
return;
}
}
@@ -368,16 +422,29 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskRunnerPool<T,
//2. loop for task fetching and running
let executor_server = self.executor_server.clone();
+ let mut task_runner_shutdown = shutdown_noti.subscribe_for_shutdown();
+ let task_runner_complete = shutdown_noti.shutdown_complete_tx.clone();
tokio::spawn(async move {
info!("Starting the task runner pool");
+
// Use a dedicated executor for CPU bound tasks so that the main tokio
// executor can still answer requests even when under load
let dedicated_executor = DedicatedExecutor::new(
"task_runner",
executor_server.executor.concurrent_tasks,
);
- loop {
- if let Some(task) = rx_task.recv().await {
+
+ // As long as the shutdown notification has not been received
+ while !task_runner_shutdown.is_shutdown() {
+ let maybe_task = tokio::select! {
+ task = rx_task.recv() => task,
+ _ = task_runner_shutdown.recv() => {
+ info!("Stop the task runner pool");
+ drop(task_runner_complete);
+ return;
+ }
+ };
+ if let Some(task) = maybe_task {
if let Some(task_id) = &task.task_id {
let task_id_log = format!(
"{}/{}/{}",
@@ -398,7 +465,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskRunnerPool<T,
error!("There's no task id in the task definition {:?}", task);
}
} else {
- info!("Channel is closed and will exit the loop");
+ info!("Channel is closed and will exit the task receive loop");
+ drop(task_runner_complete);
return;
}
}
@@ -424,8 +492,17 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorGrpc
async fn stop_executor(
&self,
- _request: Request<StopExecutorParams>,
+ request: Request<StopExecutorParams>,
) -> Result<Response<StopExecutorResult>, Status> {
- todo!()
+ let stop_request = request.into_inner();
+ let stop_reason = stop_request.reason;
+ let force = stop_request.force;
+ info!(
+ "Receive stop executor request, reason: {:?}, force {:?}",
+ stop_reason, force
+ );
+ let stop_sender = self.executor_env.tx_stop.clone();
+ stop_sender.send(force).await.unwrap();
+ Ok(Response::new(StopExecutorResult {}))
}
}
diff --git a/ballista/rust/executor/src/flight_service.rs b/ballista/rust/executor/src/flight_service.rs
index 31fd8b00..09c92ca0 100644
--- a/ballista/rust/executor/src/flight_service.rs
+++ b/ballista/rust/executor/src/flight_service.rs
@@ -19,9 +19,7 @@
use std::fs::File;
use std::pin::Pin;
-use std::sync::Arc;
-use crate::executor::Executor;
use arrow_flight::SchemaAsIpc;
use ballista_core::error::BallistaError;
use ballista_core::serde::decode_protobuf;
@@ -52,14 +50,17 @@ type FlightDataReceiver = Receiver<Result<FlightData, Status>>;
/// Service implementing the Apache Arrow Flight Protocol
#[derive(Clone)]
-pub struct BallistaFlightService {
- /// Executor
- _executor: Arc<Executor>,
-}
+pub struct BallistaFlightService {}
impl BallistaFlightService {
- pub fn new(_executor: Arc<Executor>) -> Self {
- Self { _executor }
+ pub fn new() -> Self {
+ Self {}
+ }
+}
+
+impl Default for BallistaFlightService {
+ fn default() -> Self {
+ Self::new()
}
}
diff --git a/ballista/rust/executor/src/lib.rs b/ballista/rust/executor/src/lib.rs
index e93993b6..88578cbd 100644
--- a/ballista/rust/executor/src/lib.rs
+++ b/ballista/rust/executor/src/lib.rs
@@ -23,6 +23,7 @@ pub mod executor;
pub mod executor_server;
pub mod flight_service;
pub mod metrics;
+pub mod shutdown;
mod cpu_bound_executor;
mod standalone;
diff --git a/ballista/rust/executor/src/main.rs b/ballista/rust/executor/src/main.rs
index 882600e8..ee7dfdd1 100644
--- a/ballista/rust/executor/src/main.rs
+++ b/ballista/rust/executor/src/main.rs
@@ -18,6 +18,7 @@
//! Ballista Rust executor binary.
use chrono::{DateTime, Duration, Utc};
+use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration as Core_Duration;
@@ -27,6 +28,7 @@ use ballista_executor::{execution_loop, executor_server};
use log::{error, info};
use tempfile::TempDir;
use tokio::fs::ReadDir;
+use tokio::signal;
use tokio::{fs, time};
use uuid::Uuid;
@@ -43,9 +45,15 @@ use ballista_core::{print_version, BALLISTA_VERSION};
use ballista_executor::executor::Executor;
use ballista_executor::flight_service::BallistaFlightService;
use ballista_executor::metrics::LoggingMetricsCollector;
+use ballista_executor::shutdown::Shutdown;
+use ballista_executor::shutdown::ShutdownNotifier;
use config::prelude::*;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion_proto::protobuf::LogicalPlanNode;
+use futures::stream::FuturesUnordered;
+use futures::StreamExt;
+use tokio::sync::mpsc;
+use tokio::task::JoinHandle;
#[macro_use]
extern crate configure_me;
@@ -154,57 +162,154 @@ async fn main() -> Result<()> {
let scheduler_policy = opt.task_scheduling_policy;
let cleanup_ttl = opt.executor_cleanup_ttl;
+ // Graceful shutdown notification
+ let shutdown_noti = ShutdownNotifier::new();
+
if opt.executor_cleanup_enable {
let mut interval_time =
time::interval(Core_Duration::from_secs(opt.executor_cleanup_interval));
+ let mut shuffle_cleaner_shutdown = shutdown_noti.subscribe_for_shutdown();
+ let shuffle_cleaner_complete = shutdown_noti.shutdown_complete_tx.clone();
tokio::spawn(async move {
- loop {
- interval_time.tick().await;
- if let Err(e) =
- clean_shuffle_data_loop(&work_dir, cleanup_ttl as i64).await
- {
- error!("Ballista executor fail to clean_shuffle_data {:?}", e)
- }
+ // As long as the shutdown notification has not been received
+ while !shuffle_cleaner_shutdown.is_shutdown() {
+ tokio::select! {
+ _ = interval_time.tick() => {
+ if let Err(e) = clean_shuffle_data_loop(&work_dir, cleanup_ttl as i64).await
+ {
+ error!("Ballista executor fail to clean_shuffle_data {:?}", e)
+ }
+ },
+ _ = shuffle_cleaner_shutdown.recv() => {
+ if let Err(e) = clean_all_shuffle_data(&work_dir).await
+ {
+ error!("Ballista executor fail to clean_shuffle_data {:?}", e)
+ } else {
+ info!("Shuffle data cleaned.");
+ }
+ drop(shuffle_cleaner_complete);
+ return;
+ }
+ };
}
});
}
+ let mut service_handlers: FuturesUnordered<JoinHandle<Result<(), BallistaError>>> =
+ FuturesUnordered::new();
+
+ // Channels used to receive stop requests from Executor grpc service.
+ let (stop_send, mut stop_recv) = mpsc::channel::<bool>(10);
+
match scheduler_policy {
TaskSchedulingPolicy::PushStaged => {
- tokio::spawn(executor_server::startup(
- scheduler,
- executor.clone(),
- default_codec,
- ));
+ service_handlers.push(
+ //If there is executor registration error during startup, return the error and stop early.
+ executor_server::startup(
+ scheduler,
+ executor.clone(),
+ default_codec,
+ stop_send,
+ &shutdown_noti,
+ )
+ .await?,
+ );
}
_ => {
- tokio::spawn(execution_loop::poll_loop(
+ service_handlers.push(tokio::spawn(execution_loop::poll_loop(
scheduler,
executor.clone(),
default_codec,
- ));
+ )));
}
+ };
+ service_handlers.push(tokio::spawn(flight_server_run(
+ addr,
+ shutdown_noti.subscribe_for_shutdown(),
+ )));
+
+ // Concurrently run the service checking and listen for the `shutdown` signal and wait for the stop request coming.
+ // The check_services runs until an error is encountered, so under normal circumstances, this `select!` statement runs
+ // until the `shutdown` signal is received or a stop request is coming.
+ tokio::select! {
+ service_val = check_services(&mut service_handlers) => {
+ info!("services stopped with reason {:?}", service_val);
+ },
+ _ = signal::ctrl_c() => {
+ // sometimes OS can not log ??
+ info!("received ctrl-c event.");
+ },
+ _ = stop_recv.recv() => {},
}
- // Arrow flight service
- {
- let service = BallistaFlightService::new(executor.clone());
- let server = FlightServiceServer::new(service);
- info!(
- "Ballista v{} Rust Executor listening on {:?}",
- BALLISTA_VERSION, addr
- );
- let server_future =
- tokio::spawn(create_grpc_server().add_service(server).serve(addr));
- server_future
- .await
- .context("Tokio error")?
- .context("Could not start executor server")?;
- }
-
+ // Extract the `shutdown_complete` receiver and transmitter
+ // explicitly drop `shutdown_transmitter`. This is important, as the
+ // `.await` below would otherwise never complete.
+ let ShutdownNotifier {
+ mut shutdown_complete_rx,
+ shutdown_complete_tx,
+ notify_shutdown,
+ ..
+ } = shutdown_noti;
+
+ // When `notify_shutdown` is dropped, all components which have `subscribe`d will
+ // receive the shutdown signal and can exit
+ drop(notify_shutdown);
+ // Drop final `Sender` so the `Receiver` below can complete
+ drop(shutdown_complete_tx);
+
+ // Wait for all related components to finish the shutdown processing.
+ let _ = shutdown_complete_rx.recv().await;
+ info!("Executor stopped.");
Ok(())
}
+// Arrow flight service
+async fn flight_server_run(
+ addr: SocketAddr,
+ mut grpc_shutdown: Shutdown,
+) -> Result<(), BallistaError> {
+ let service = BallistaFlightService::new();
+ let server = FlightServiceServer::new(service);
+ info!(
+ "Ballista v{} Rust Executor Flight Server listening on {:?}",
+ BALLISTA_VERSION, addr
+ );
+
+ let shutdown_signal = grpc_shutdown.recv();
+ let server_future = create_grpc_server()
+ .add_service(server)
+ .serve_with_shutdown(addr, shutdown_signal);
+
+ server_future.await.map_err(|e| {
+ error!("Tonic error, Could not start Executor Flight Server.");
+ BallistaError::TonicError(e)
+ })
+}
+
+// Check the status of long running services
+async fn check_services(
+ service_handlers: &mut FuturesUnordered<JoinHandle<Result<(), BallistaError>>>,
+) -> Result<(), BallistaError> {
+ loop {
+ match service_handlers.next().await {
+ Some(result) => match result {
+ // React to "inner_result", i.e. propagate as BallistaError
+ Ok(inner_result) => match inner_result {
+ Ok(()) => (),
+ Err(e) => return Err(e),
+ },
+ // React to JoinError
+ Err(e) => return Err(BallistaError::TokioError(e)),
+ },
+ None => {
+ info!("service handlers are all done with their work!");
+ return Ok(());
+ }
+ }
+ }
+}
+
/// This function will scheduled periodically for cleanup executor.
/// Will only clean the dir under work_dir not include file
async fn clean_shuffle_data_loop(work_dir: &str, seconds: i64) -> Result<()> {
@@ -243,6 +348,28 @@ async fn clean_shuffle_data_loop(work_dir: &str, seconds: i64) -> Result<()> {
Ok(())
}
+/// This function will clean up all shuffle data on this executor
+async fn clean_all_shuffle_data(work_dir: &str) -> Result<()> {
+ let mut dir = fs::read_dir(work_dir).await?;
+ let mut to_deleted = Vec::new();
+ while let Some(child) = dir.next_entry().await? {
+ if let Ok(metadata) = child.metadata().await {
+ // only delete the job dir
+ if metadata.is_dir() {
+ to_deleted.push(child.path().into_os_string())
+ }
+ } else {
+ error!("Can not get metadata from file: {:?}", child)
+ }
+ }
+
+ info!("The work_dir {:?} will be deleted", &to_deleted);
+ for del in to_deleted {
+ fs::remove_dir_all(del).await?;
+ }
+ Ok(())
+}
+
/// Determines if a directory all files are older than cutoff seconds.
async fn check_modified_time_in_dirs(
mut vec: Vec<ReadDir>,
diff --git a/ballista/rust/executor/src/shutdown.rs b/ballista/rust/executor/src/shutdown.rs
new file mode 100644
index 00000000..76ff289f
--- /dev/null
+++ b/ballista/rust/executor/src/shutdown.rs
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use tokio::sync::{broadcast, mpsc};
+
+/// Listens for the server shutdown signal(copied from mini-redis example).
+///
+/// Shutdown is signalled using a `broadcast::Receiver`. Only a single value is
+/// ever sent. Once a value has been sent via the broadcast channel, the server
+/// should shutdown.
+///
+/// The `Shutdown` struct listens for the signal and tracks that the signal has
+/// been received. Callers may query for whether the shutdown signal has been
+/// received or not.
+#[derive(Debug)]
+pub struct Shutdown {
+ /// `true` if the shutdown signal has been received
+ shutdown: bool,
+
+ /// The receive half of the channel used to listen for shutdown.
+ notify: broadcast::Receiver<()>,
+}
+
+impl Shutdown {
+ /// Create a new `Shutdown` backed by the given `broadcast::Receiver`.
+ pub fn new(notify: broadcast::Receiver<()>) -> Shutdown {
+ Shutdown {
+ shutdown: false,
+ notify,
+ }
+ }
+
+ /// Returns `true` if the shutdown signal has been received.
+ pub fn is_shutdown(&self) -> bool {
+ self.shutdown
+ }
+
+ /// Receive the shutdown notice, waiting if necessary.
+ pub async fn recv(&mut self) {
+ // If the shutdown signal has already been received, then return
+ // immediately.
+ if self.shutdown {
+ return;
+ }
+
+ // Cannot receive a "lag error" as only one value is ever sent.
+ let _ = self.notify.recv().await;
+
+ // Remember that the signal has been received.
+ self.shutdown = true;
+ }
+}
+
+pub struct ShutdownNotifier {
+ /// Broadcasts a shutdown signal to all related components.
+ pub notify_shutdown: broadcast::Sender<()>,
+
+ /// Used as part of the graceful shutdown process to wait for
+ /// related components to complete processing.
+ ///
+ /// Tokio channels are closed once all `Sender` handles go out of scope.
+ /// When a channel is closed, the receiver receives `None`. This is
+ /// leveraged to detect all shutdown processing completing.
+ pub shutdown_complete_rx: mpsc::Receiver<()>,
+
+ pub shutdown_complete_tx: mpsc::Sender<()>,
+}
+
+impl ShutdownNotifier {
+ /// Create a new ShutdownNotifier instance
+ pub fn new() -> Self {
+ let (notify_shutdown, _) = broadcast::channel(1);
+ let (shutdown_complete_tx, shutdown_complete_rx) = mpsc::channel(1);
+ Self {
+ notify_shutdown,
+ shutdown_complete_rx,
+ shutdown_complete_tx,
+ }
+ }
+
+ /// Subscribe for shutdown notification
+ pub fn subscribe_for_shutdown(&self) -> Shutdown {
+ Shutdown::new(self.notify_shutdown.subscribe())
+ }
+}
+
+impl Default for ShutdownNotifier {
+ fn default() -> Self {
+ ShutdownNotifier::new()
+ }
+}
diff --git a/ballista/rust/executor/src/standalone.rs b/ballista/rust/executor/src/standalone.rs
index ca5513fa..33690031 100644
--- a/ballista/rust/executor/src/standalone.rs
+++ b/ballista/rust/executor/src/standalone.rs
@@ -82,7 +82,7 @@ pub async fn new_standalone_executor<
concurrent_tasks,
));
- let service = BallistaFlightService::new(executor.clone());
+ let service = BallistaFlightService::new();
let server = FlightServiceServer::new(service);
tokio::spawn(
create_grpc_server()