You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/08/22 04:33:12 UTC

[arrow-ballista] branch master updated: Stop Executor Impl, Executor Graceful Shutdown (#151)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f80b42a Stop Executor Impl, Executor Graceful Shutdown (#151)
3f80b42a is described below

commit 3f80b42aca15380c5dfd053b53753067e57801f8
Author: mingmwang <mi...@ebay.com>
AuthorDate: Mon Aug 22 12:33:08 2022 +0800

    Stop Executor Impl, Executor Graceful Shutdown (#151)
---
 ballista/rust/core/proto/ballista.proto       |   4 +
 ballista/rust/executor/src/execution_loop.rs  |   9 +-
 ballista/rust/executor/src/executor_server.rs | 135 +++++++++++++++----
 ballista/rust/executor/src/flight_service.rs  |  17 +--
 ballista/rust/executor/src/lib.rs             |   1 +
 ballista/rust/executor/src/main.rs            | 187 +++++++++++++++++++++-----
 ballista/rust/executor/src/shutdown.rs        | 105 +++++++++++++++
 ballista/rust/executor/src/standalone.rs      |   2 +-
 8 files changed, 387 insertions(+), 73 deletions(-)

diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto
index 7472d7ae..0ebc922d 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -681,6 +681,10 @@ message HeartBeatResult {
 }
 
 message StopExecutorParams {
+  // stop reason
+  string reason = 1;
+  // force to stop the executor immediately
+  bool force = 2;
 }
 
 message StopExecutorResult {
diff --git a/ballista/rust/executor/src/execution_loop.rs b/ballista/rust/executor/src/execution_loop.rs
index 471377e4..f7e029c2 100644
--- a/ballista/rust/executor/src/execution_loop.rs
+++ b/ballista/rust/executor/src/execution_loop.rs
@@ -32,7 +32,7 @@ use ballista_core::utils::collect_plan_metrics;
 use datafusion::execution::context::TaskContext;
 use datafusion_proto::logical_plan::AsLogicalPlan;
 use futures::FutureExt;
-use log::{debug, error, info, trace, warn};
+use log::{debug, error, info, warn};
 use std::any::Any;
 use std::collections::HashMap;
 use std::convert::TryInto;
@@ -47,7 +47,7 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
     mut scheduler: SchedulerGrpcClient<Channel>,
     executor: Arc<Executor>,
     codec: BallistaCodec<T, U>,
-) {
+) -> Result<(), BallistaError> {
     let executor_specification: ExecutorSpecification = executor
         .metadata
         .specification
@@ -59,10 +59,9 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
         Arc::new(AtomicUsize::new(executor_specification.task_slots as usize));
     let (task_status_sender, mut task_status_receiver) =
         std::sync::mpsc::channel::<TaskStatus>();
+    info!("Starting poll work loop with scheduler");
 
     loop {
-        trace!("Starting registration loop with scheduler");
-
         let task_status: Vec<TaskStatus> =
             sample_tasks_status(&mut task_status_receiver).await;
 
@@ -108,7 +107,7 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
                 }
             }
             Err(error) => {
-                warn!("Executor registration failed. If this continues to happen the executor might be marked as dead by the scheduler. Error: {}", error);
+                warn!("Executor poll work loop failed. If this continues to happen the Scheduler might be marked as dead. Error: {}", error);
             }
         }
         if !active_job {
diff --git a/ballista/rust/executor/src/executor_server.rs b/ballista/rust/executor/src/executor_server.rs
index 591c88a1..8f0e758f 100644
--- a/ballista/rust/executor/src/executor_server.rs
+++ b/ballista/rust/executor/src/executor_server.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use ballista_core::BALLISTA_VERSION;
 use std::collections::HashMap;
 use std::convert::TryInto;
 use std::ops::Deref;
@@ -45,16 +46,22 @@ use datafusion::execution::context::TaskContext;
 use datafusion::physical_plan::ExecutionPlan;
 use datafusion_proto::logical_plan::AsLogicalPlan;
 use tokio::sync::mpsc::error::TryRecvError;
+use tokio::task::JoinHandle;
 
 use crate::as_task_status;
 use crate::cpu_bound_executor::DedicatedExecutor;
 use crate::executor::Executor;
+use crate::shutdown::ShutdownNotifier;
+
+type ServerHandle = JoinHandle<Result<(), BallistaError>>;
 
 pub async fn startup<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
     mut scheduler: SchedulerGrpcClient<Channel>,
     executor: Arc<Executor>,
     codec: BallistaCodec<T, U>,
-) {
+    stop_send: mpsc::Sender<bool>,
+    shutdown_noti: &ShutdownNotifier,
+) -> Result<ServerHandle, BallistaError> {
     // TODO make the buffer size configurable
     let (tx_task, rx_task) = mpsc::channel::<TaskDefinition>(1000);
     let (tx_task_status, rx_task_status) = mpsc::channel::<TaskStatus>(1000);
@@ -65,12 +72,13 @@ pub async fn startup<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
         ExecutorEnv {
             tx_task,
             tx_task_status,
+            tx_stop: stop_send,
         },
         codec,
     );
 
     // 1. Start executor grpc service
-    {
+    let server = {
         let executor_meta = executor.metadata.clone();
         let addr = format!(
             "{}:{}",
@@ -83,36 +91,53 @@ pub async fn startup<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
             executor_meta.grpc_port
         );
         let addr = addr.parse().unwrap();
-        info!("Setup executor grpc service for {:?}", addr);
 
+        info!(
+            "Ballista v{} Rust Executor Grpc Server listening on {:?}",
+            BALLISTA_VERSION, addr
+        );
         let server = ExecutorGrpcServer::new(executor_server.clone());
-        let grpc_server_future = create_grpc_server().add_service(server).serve(addr);
-        tokio::spawn(async move { grpc_server_future.await });
-    }
-
-    let executor_server = Arc::new(executor_server);
+        let mut grpc_shutdown = shutdown_noti.subscribe_for_shutdown();
+        tokio::spawn(async move {
+            let shutdown_signal = grpc_shutdown.recv();
+            let grpc_server_future = create_grpc_server()
+                .add_service(server)
+                .serve_with_shutdown(addr, shutdown_signal);
+            grpc_server_future.await.map_err(|e| {
+                error!("Tonic error, Could not start Executor Grpc Server.");
+                BallistaError::TonicError(e)
+            })
+        })
+    };
 
     // 2. Do executor registration
+    // TODO the executor registration should happen only after the executor grpc server started.
+    let executor_server = Arc::new(executor_server);
     match register_executor(&mut scheduler, executor.clone()).await {
         Ok(_) => {
             info!("Executor registration succeed");
         }
         Err(error) => {
-            panic!("Executor registration failed due to: {}", error);
+            error!("Executor registration failed due to: {}", error);
+            // abort the Executor Grpc Future
+            server.abort();
+            return Err(error);
         }
     };
 
-    // 3. Start Heartbeater
+    // 3. Start Heartbeater loop
     {
         let heartbeater = Heartbeater::new(executor_server.clone());
-        heartbeater.start().await;
+        heartbeater.start(shutdown_noti);
     }
 
-    // 4. Start TaskRunnerPool
+    // 4. Start TaskRunnerPool loop
     {
         let task_runner_pool = TaskRunnerPool::new(executor_server.clone());
-        task_runner_pool.start(rx_task, rx_task_status).await;
+        task_runner_pool.start(rx_task, rx_task_status, shutdown_noti);
     }
+
+    Ok(server)
 }
 
 #[allow(clippy::clone_on_copy)]
@@ -149,6 +174,8 @@ struct ExecutorEnv {
     tx_task: mpsc::Sender<TaskDefinition>,
     /// Receive `TaskStatus` from CPU bound tasks pool `dedicated_executor` then use rpc send back to scheduler.
     tx_task_status: mpsc::Sender<TaskStatus>,
+    /// Receive stop executor request from rpc.
+    tx_stop: mpsc::Sender<bool>,
 }
 
 unsafe impl Sync for ExecutorEnv {}
@@ -280,6 +307,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
     }
 }
 
+/// Heartbeater will run forever until a shutdown notification received.
 struct Heartbeater<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> {
     executor_server: Arc<ExecutorServer<T, U>>,
 }
@@ -289,18 +317,32 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> Heartbeater<T, U>
         Self { executor_server }
     }
 
-    async fn start(&self) {
+    fn start(&self, shutdown_noti: &ShutdownNotifier) {
         let executor_server = self.executor_server.clone();
+        let mut heartbeat_shutdown = shutdown_noti.subscribe_for_shutdown();
+        let heartbeat_complete = shutdown_noti.shutdown_complete_tx.clone();
         tokio::spawn(async move {
             info!("Starting heartbeater to send heartbeat the scheduler periodically");
-            loop {
+            // As long as the shutdown notification has not been received
+            while !heartbeat_shutdown.is_shutdown() {
                 executor_server.heartbeat().await;
-                tokio::time::sleep(Duration::from_millis(60000)).await;
+                tokio::select! {
+                    _ = tokio::time::sleep(Duration::from_millis(60000)) => {},
+                    _ = heartbeat_shutdown.recv() => {
+                        info!("Stop heartbeater");
+                        drop(heartbeat_complete);
+                        return;
+                    }
+                };
             }
         });
     }
 }
 
+/// There are two loop(future) running separately in tokio runtime.
+/// First is for sending back task status to scheduler
+/// Second is for receiving task from scheduler and run.
+/// The two loops will run forever until a shutdown notification received.
 struct TaskRunnerPool<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> {
     executor_server: Arc<ExecutorServer<T, U>>,
 }
@@ -310,25 +352,36 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskRunnerPool<T,
         Self { executor_server }
     }
 
-    // There are two loop(future) running separately in tokio runtime.
-    // First is for sending back task status to scheduler
-    // Second is for receiving task from scheduler and run
-    async fn start(
+    fn start(
         &self,
         mut rx_task: mpsc::Receiver<TaskDefinition>,
         mut rx_task_status: mpsc::Receiver<TaskStatus>,
+        shutdown_noti: &ShutdownNotifier,
     ) {
         //1. loop for task status reporting
         let executor_server = self.executor_server.clone();
+        let mut tasks_status_shutdown = shutdown_noti.subscribe_for_shutdown();
+        let tasks_status_complete = shutdown_noti.shutdown_complete_tx.clone();
         tokio::spawn(async move {
             info!("Starting the task status reporter");
-            loop {
+            // As long as the shutdown notification has not been received
+            while !tasks_status_shutdown.is_shutdown() {
                 let mut tasks_status = vec![];
                 // First try to fetch task status from the channel in *blocking* mode
-                if let Some(task_status) = rx_task_status.recv().await {
+                let maybe_task_status = tokio::select! {
+                     task_status = rx_task_status.recv() => task_status,
+                    _ = tasks_status_shutdown.recv() => {
+                        info!("Stop task status reporting loop");
+                        drop(tasks_status_complete);
+                        return;
+                    }
+                };
+
+                if let Some(task_status) = maybe_task_status {
                     tasks_status.push(task_status);
                 } else {
-                    info!("Channel is closed and will exit the loop");
+                    info!("Channel is closed and will exit the task status report loop.");
+                    drop(tasks_status_complete);
                     return;
                 }
 
@@ -346,7 +399,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskRunnerPool<T,
                             break;
                         }
                         Err(TryRecvError::Disconnected) => {
-                            info!("Channel is closed and will exit the loop");
+                            info!("Channel is closed and will exit the task status report loop");
+                            drop(tasks_status_complete);
                             return;
                         }
                     }
@@ -368,16 +422,29 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskRunnerPool<T,
 
         //2. loop for task fetching and running
         let executor_server = self.executor_server.clone();
+        let mut task_runner_shutdown = shutdown_noti.subscribe_for_shutdown();
+        let task_runner_complete = shutdown_noti.shutdown_complete_tx.clone();
         tokio::spawn(async move {
             info!("Starting the task runner pool");
+
             // Use a dedicated executor for CPU bound tasks so that the main tokio
             // executor can still answer requests even when under load
             let dedicated_executor = DedicatedExecutor::new(
                 "task_runner",
                 executor_server.executor.concurrent_tasks,
             );
-            loop {
-                if let Some(task) = rx_task.recv().await {
+
+            // As long as the shutdown notification has not been received
+            while !task_runner_shutdown.is_shutdown() {
+                let maybe_task = tokio::select! {
+                     task = rx_task.recv() => task,
+                    _ = task_runner_shutdown.recv() => {
+                        info!("Stop the task runner pool");
+                        drop(task_runner_complete);
+                        return;
+                    }
+                };
+                if let Some(task) = maybe_task {
                     if let Some(task_id) = &task.task_id {
                         let task_id_log = format!(
                             "{}/{}/{}",
@@ -398,7 +465,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskRunnerPool<T,
                         error!("There's no task id in the task definition {:?}", task);
                     }
                 } else {
-                    info!("Channel is closed and will exit the loop");
+                    info!("Channel is closed and will exit the task receive loop");
+                    drop(task_runner_complete);
                     return;
                 }
             }
@@ -424,8 +492,17 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorGrpc
 
     async fn stop_executor(
         &self,
-        _request: Request<StopExecutorParams>,
+        request: Request<StopExecutorParams>,
     ) -> Result<Response<StopExecutorResult>, Status> {
-        todo!()
+        let stop_request = request.into_inner();
+        let stop_reason = stop_request.reason;
+        let force = stop_request.force;
+        info!(
+            "Receive stop executor request, reason: {:?}, force {:?}",
+            stop_reason, force
+        );
+        let stop_sender = self.executor_env.tx_stop.clone();
+        stop_sender.send(force).await.unwrap();
+        Ok(Response::new(StopExecutorResult {}))
     }
 }
diff --git a/ballista/rust/executor/src/flight_service.rs b/ballista/rust/executor/src/flight_service.rs
index 31fd8b00..09c92ca0 100644
--- a/ballista/rust/executor/src/flight_service.rs
+++ b/ballista/rust/executor/src/flight_service.rs
@@ -19,9 +19,7 @@
 
 use std::fs::File;
 use std::pin::Pin;
-use std::sync::Arc;
 
-use crate::executor::Executor;
 use arrow_flight::SchemaAsIpc;
 use ballista_core::error::BallistaError;
 use ballista_core::serde::decode_protobuf;
@@ -52,14 +50,17 @@ type FlightDataReceiver = Receiver<Result<FlightData, Status>>;
 
 /// Service implementing the Apache Arrow Flight Protocol
 #[derive(Clone)]
-pub struct BallistaFlightService {
-    /// Executor
-    _executor: Arc<Executor>,
-}
+pub struct BallistaFlightService {}
 
 impl BallistaFlightService {
-    pub fn new(_executor: Arc<Executor>) -> Self {
-        Self { _executor }
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl Default for BallistaFlightService {
+    fn default() -> Self {
+        Self::new()
     }
 }
 
diff --git a/ballista/rust/executor/src/lib.rs b/ballista/rust/executor/src/lib.rs
index e93993b6..88578cbd 100644
--- a/ballista/rust/executor/src/lib.rs
+++ b/ballista/rust/executor/src/lib.rs
@@ -23,6 +23,7 @@ pub mod executor;
 pub mod executor_server;
 pub mod flight_service;
 pub mod metrics;
+pub mod shutdown;
 
 mod cpu_bound_executor;
 mod standalone;
diff --git a/ballista/rust/executor/src/main.rs b/ballista/rust/executor/src/main.rs
index 882600e8..ee7dfdd1 100644
--- a/ballista/rust/executor/src/main.rs
+++ b/ballista/rust/executor/src/main.rs
@@ -18,6 +18,7 @@
 //! Ballista Rust executor binary.
 
 use chrono::{DateTime, Duration, Utc};
+use std::net::SocketAddr;
 use std::sync::Arc;
 use std::time::Duration as Core_Duration;
 
@@ -27,6 +28,7 @@ use ballista_executor::{execution_loop, executor_server};
 use log::{error, info};
 use tempfile::TempDir;
 use tokio::fs::ReadDir;
+use tokio::signal;
 use tokio::{fs, time};
 use uuid::Uuid;
 
@@ -43,9 +45,15 @@ use ballista_core::{print_version, BALLISTA_VERSION};
 use ballista_executor::executor::Executor;
 use ballista_executor::flight_service::BallistaFlightService;
 use ballista_executor::metrics::LoggingMetricsCollector;
+use ballista_executor::shutdown::Shutdown;
+use ballista_executor::shutdown::ShutdownNotifier;
 use config::prelude::*;
 use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
 use datafusion_proto::protobuf::LogicalPlanNode;
+use futures::stream::FuturesUnordered;
+use futures::StreamExt;
+use tokio::sync::mpsc;
+use tokio::task::JoinHandle;
 
 #[macro_use]
 extern crate configure_me;
@@ -154,57 +162,154 @@ async fn main() -> Result<()> {
     let scheduler_policy = opt.task_scheduling_policy;
     let cleanup_ttl = opt.executor_cleanup_ttl;
 
+    // Graceful shutdown notification
+    let shutdown_noti = ShutdownNotifier::new();
+
     if opt.executor_cleanup_enable {
         let mut interval_time =
             time::interval(Core_Duration::from_secs(opt.executor_cleanup_interval));
+        let mut shuffle_cleaner_shutdown = shutdown_noti.subscribe_for_shutdown();
+        let shuffle_cleaner_complete = shutdown_noti.shutdown_complete_tx.clone();
         tokio::spawn(async move {
-            loop {
-                interval_time.tick().await;
-                if let Err(e) =
-                    clean_shuffle_data_loop(&work_dir, cleanup_ttl as i64).await
-                {
-                    error!("Ballista executor fail to clean_shuffle_data {:?}", e)
-                }
+            // As long as the shutdown notification has not been received
+            while !shuffle_cleaner_shutdown.is_shutdown() {
+                tokio::select! {
+                    _ = interval_time.tick() => {
+                            if let Err(e) = clean_shuffle_data_loop(&work_dir, cleanup_ttl as i64).await
+                        {
+                            error!("Ballista executor fail to clean_shuffle_data {:?}", e)
+                        }
+                        },
+                    _ = shuffle_cleaner_shutdown.recv() => {
+                        if let Err(e) = clean_all_shuffle_data(&work_dir).await
+                        {
+                            error!("Ballista executor fail to clean_shuffle_data {:?}", e)
+                        } else {
+                            info!("Shuffle data cleaned.");
+                        }
+                        drop(shuffle_cleaner_complete);
+                        return;
+                    }
+                };
             }
         });
     }
 
+    let mut service_handlers: FuturesUnordered<JoinHandle<Result<(), BallistaError>>> =
+        FuturesUnordered::new();
+
+    // Channels used to receive stop requests from Executor grpc service.
+    let (stop_send, mut stop_recv) = mpsc::channel::<bool>(10);
+
     match scheduler_policy {
         TaskSchedulingPolicy::PushStaged => {
-            tokio::spawn(executor_server::startup(
-                scheduler,
-                executor.clone(),
-                default_codec,
-            ));
+            service_handlers.push(
+                //If there is executor registration error during startup, return the error and stop early.
+                executor_server::startup(
+                    scheduler,
+                    executor.clone(),
+                    default_codec,
+                    stop_send,
+                    &shutdown_noti,
+                )
+                .await?,
+            );
         }
         _ => {
-            tokio::spawn(execution_loop::poll_loop(
+            service_handlers.push(tokio::spawn(execution_loop::poll_loop(
                 scheduler,
                 executor.clone(),
                 default_codec,
-            ));
+            )));
         }
+    };
+    service_handlers.push(tokio::spawn(flight_server_run(
+        addr,
+        shutdown_noti.subscribe_for_shutdown(),
+    )));
+
+    // Concurrently run the service checking and listen for the `shutdown` signal and wait for the stop request coming.
+    // The check_services runs until an error is encountered, so under normal circumstances, this `select!` statement runs
+    // until the `shutdown` signal is received or a stop request is coming.
+    tokio::select! {
+        service_val = check_services(&mut service_handlers) => {
+             info!("services stopped with reason {:?}", service_val);
+        },
+        _ = signal::ctrl_c() => {
+             // sometimes OS can not log ??
+             info!("received ctrl-c event.");
+        },
+        _ = stop_recv.recv() => {},
     }
 
-    // Arrow flight service
-    {
-        let service = BallistaFlightService::new(executor.clone());
-        let server = FlightServiceServer::new(service);
-        info!(
-            "Ballista v{} Rust Executor listening on {:?}",
-            BALLISTA_VERSION, addr
-        );
-        let server_future =
-            tokio::spawn(create_grpc_server().add_service(server).serve(addr));
-        server_future
-            .await
-            .context("Tokio error")?
-            .context("Could not start executor server")?;
-    }
-
+    // Extract the `shutdown_complete` receiver and transmitter
+    // explicitly drop `shutdown_transmitter`. This is important, as the
+    // `.await` below would otherwise never complete.
+    let ShutdownNotifier {
+        mut shutdown_complete_rx,
+        shutdown_complete_tx,
+        notify_shutdown,
+        ..
+    } = shutdown_noti;
+
+    // When `notify_shutdown` is dropped, all components which have `subscribe`d will
+    // receive the shutdown signal and can exit
+    drop(notify_shutdown);
+    // Drop final `Sender` so the `Receiver` below can complete
+    drop(shutdown_complete_tx);
+
+    // Wait for all related components to finish the shutdown processing.
+    let _ = shutdown_complete_rx.recv().await;
+    info!("Executor stopped.");
     Ok(())
 }
 
+// Arrow flight service
+async fn flight_server_run(
+    addr: SocketAddr,
+    mut grpc_shutdown: Shutdown,
+) -> Result<(), BallistaError> {
+    let service = BallistaFlightService::new();
+    let server = FlightServiceServer::new(service);
+    info!(
+        "Ballista v{} Rust Executor Flight Server listening on {:?}",
+        BALLISTA_VERSION, addr
+    );
+
+    let shutdown_signal = grpc_shutdown.recv();
+    let server_future = create_grpc_server()
+        .add_service(server)
+        .serve_with_shutdown(addr, shutdown_signal);
+
+    server_future.await.map_err(|e| {
+        error!("Tonic error, Could not start Executor Flight Server.");
+        BallistaError::TonicError(e)
+    })
+}
+
+// Check the status of long running services
+async fn check_services(
+    service_handlers: &mut FuturesUnordered<JoinHandle<Result<(), BallistaError>>>,
+) -> Result<(), BallistaError> {
+    loop {
+        match service_handlers.next().await {
+            Some(result) => match result {
+                // React to "inner_result", i.e. propagate as BallistaError
+                Ok(inner_result) => match inner_result {
+                    Ok(()) => (),
+                    Err(e) => return Err(e),
+                },
+                // React to JoinError
+                Err(e) => return Err(BallistaError::TokioError(e)),
+            },
+            None => {
+                info!("service handlers are all done with their work!");
+                return Ok(());
+            }
+        }
+    }
+}
+
 /// This function will scheduled periodically for cleanup executor.
 /// Will only clean the dir under work_dir not include file
 async fn clean_shuffle_data_loop(work_dir: &str, seconds: i64) -> Result<()> {
@@ -243,6 +348,28 @@ async fn clean_shuffle_data_loop(work_dir: &str, seconds: i64) -> Result<()> {
     Ok(())
 }
 
+/// This function will clean up all shuffle data on this executor
+async fn clean_all_shuffle_data(work_dir: &str) -> Result<()> {
+    let mut dir = fs::read_dir(work_dir).await?;
+    let mut to_deleted = Vec::new();
+    while let Some(child) = dir.next_entry().await? {
+        if let Ok(metadata) = child.metadata().await {
+            // only delete the job dir
+            if metadata.is_dir() {
+                to_deleted.push(child.path().into_os_string())
+            }
+        } else {
+            error!("Can not get metadata from file: {:?}", child)
+        }
+    }
+
+    info!("The work_dir {:?} will be deleted", &to_deleted);
+    for del in to_deleted {
+        fs::remove_dir_all(del).await?;
+    }
+    Ok(())
+}
+
 /// Determines if a directory all files are older than cutoff seconds.
 async fn check_modified_time_in_dirs(
     mut vec: Vec<ReadDir>,
diff --git a/ballista/rust/executor/src/shutdown.rs b/ballista/rust/executor/src/shutdown.rs
new file mode 100644
index 00000000..76ff289f
--- /dev/null
+++ b/ballista/rust/executor/src/shutdown.rs
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use tokio::sync::{broadcast, mpsc};
+
+/// Listens for the server shutdown signal(copied from mini-redis example).
+///
+/// Shutdown is signalled using a `broadcast::Receiver`. Only a single value is
+/// ever sent. Once a value has been sent via the broadcast channel, the server
+/// should shutdown.
+///
+/// The `Shutdown` struct listens for the signal and tracks that the signal has
+/// been received. Callers may query for whether the shutdown signal has been
+/// received or not.
+#[derive(Debug)]
+pub struct Shutdown {
+    /// `true` if the shutdown signal has been received
+    shutdown: bool,
+
+    /// The receive half of the channel used to listen for shutdown.
+    notify: broadcast::Receiver<()>,
+}
+
+impl Shutdown {
+    /// Create a new `Shutdown` backed by the given `broadcast::Receiver`.
+    pub fn new(notify: broadcast::Receiver<()>) -> Shutdown {
+        Shutdown {
+            shutdown: false,
+            notify,
+        }
+    }
+
+    /// Returns `true` if the shutdown signal has been received.
+    pub fn is_shutdown(&self) -> bool {
+        self.shutdown
+    }
+
+    /// Receive the shutdown notice, waiting if necessary.
+    pub async fn recv(&mut self) {
+        // If the shutdown signal has already been received, then return
+        // immediately.
+        if self.shutdown {
+            return;
+        }
+
+        // Cannot receive a "lag error" as only one value is ever sent.
+        let _ = self.notify.recv().await;
+
+        // Remember that the signal has been received.
+        self.shutdown = true;
+    }
+}
+
+pub struct ShutdownNotifier {
+    /// Broadcasts a shutdown signal to all related components.
+    pub notify_shutdown: broadcast::Sender<()>,
+
+    /// Used as part of the graceful shutdown process to wait for
+    /// related components to complete processing.
+    ///
+    /// Tokio channels are closed once all `Sender` handles go out of scope.
+    /// When a channel is closed, the receiver receives `None`. This is
+    /// leveraged to detect all shutdown processing completing.
+    pub shutdown_complete_rx: mpsc::Receiver<()>,
+
+    pub shutdown_complete_tx: mpsc::Sender<()>,
+}
+
+impl ShutdownNotifier {
+    /// Create a new ShutdownNotifier instance
+    pub fn new() -> Self {
+        let (notify_shutdown, _) = broadcast::channel(1);
+        let (shutdown_complete_tx, shutdown_complete_rx) = mpsc::channel(1);
+        Self {
+            notify_shutdown,
+            shutdown_complete_rx,
+            shutdown_complete_tx,
+        }
+    }
+
+    /// Subscribe for shutdown notification
+    pub fn subscribe_for_shutdown(&self) -> Shutdown {
+        Shutdown::new(self.notify_shutdown.subscribe())
+    }
+}
+
+impl Default for ShutdownNotifier {
+    fn default() -> Self {
+        ShutdownNotifier::new()
+    }
+}
diff --git a/ballista/rust/executor/src/standalone.rs b/ballista/rust/executor/src/standalone.rs
index ca5513fa..33690031 100644
--- a/ballista/rust/executor/src/standalone.rs
+++ b/ballista/rust/executor/src/standalone.rs
@@ -82,7 +82,7 @@ pub async fn new_standalone_executor<
         concurrent_tasks,
     ));
 
-    let service = BallistaFlightService::new(executor.clone());
+    let service = BallistaFlightService::new();
     let server = FlightServiceServer::new(service);
     tokio::spawn(
         create_grpc_server()