You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "avantgardnerio (via GitHub)" <gi...@apache.org> on 2023/02/24 21:41:35 UTC

[GitHub] [arrow-ballista] avantgardnerio commented on a diff in pull request #667: Add executor terminating status for graceful shutdown

avantgardnerio commented on code in PR #667:
URL: https://github.com/apache/arrow-ballista/pull/667#discussion_r1117683204


##########
ballista/scheduler/src/scheduler_server/mod.rs:
##########
@@ -216,83 +220,113 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
     fn expire_dead_executors(&self) -> Result<()> {
         let state = self.state.clone();
         let event_sender = self.query_stage_event_loop.get_sender()?;
+        let termination_grace_period = self.executor_termination_grace_period;
         tokio::task::spawn(async move {
             loop {
-                let expired_executors = state.executor_manager.get_expired_executors();
+                let expired_executors = state
+                    .executor_manager
+                    .get_expired_executors(termination_grace_period);
                 for expired in expired_executors {
                     let executor_id = expired.executor_id.clone();
                     let executor_manager = state.executor_manager.clone();
-                    let stop_reason = format!(
-                        "Executor {} heartbeat timed out after {}s",
-                        executor_id.clone(),
-                        DEFAULT_EXECUTOR_TIMEOUT_SECONDS
-                    );
-                    warn!("{}", stop_reason.clone());
+
                     let sender_clone = event_sender.clone();
+
+                    let terminating = matches!(
+                        expired
+                            .status
+                            .as_ref()
+                            .and_then(|status| status.status.as_ref()),
+                        Some(ballista_core::serde::protobuf::executor_status::Status::Terminating(_))
+                    );
+
+                    let stop_reason = if terminating {

Review Comment:
   Seems logical...



##########
ballista/scheduler/src/scheduler_server/grpc.rs:
##########
@@ -760,15 +763,22 @@ mod test {
             .await
             .expect("getting executor");
 
+        let is_stopped = await_condition(Duration::from_millis(10), 5, || {
+            futures::future::ready(Ok(state.executor_manager.is_dead_executor("abc")))
+        })
+        .await?;
+
         // executor should be marked to dead
-        assert!(state.executor_manager.is_dead_executor("abc"));
+        assert!(is_stopped, "Executor not marked dead after 50ms");

Review Comment:
   Nice test



##########
ballista/scheduler/src/scheduler_server/mod.rs:
##########
@@ -216,83 +220,113 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
     fn expire_dead_executors(&self) -> Result<()> {
         let state = self.state.clone();
         let event_sender = self.query_stage_event_loop.get_sender()?;
+        let termination_grace_period = self.executor_termination_grace_period;
         tokio::task::spawn(async move {
             loop {
-                let expired_executors = state.executor_manager.get_expired_executors();
+                let expired_executors = state
+                    .executor_manager
+                    .get_expired_executors(termination_grace_period);
                 for expired in expired_executors {
                     let executor_id = expired.executor_id.clone();
                     let executor_manager = state.executor_manager.clone();
-                    let stop_reason = format!(
-                        "Executor {} heartbeat timed out after {}s",
-                        executor_id.clone(),
-                        DEFAULT_EXECUTOR_TIMEOUT_SECONDS
-                    );
-                    warn!("{}", stop_reason.clone());
+
                     let sender_clone = event_sender.clone();
+
+                    let terminating = matches!(
+                        expired
+                            .status
+                            .as_ref()
+                            .and_then(|status| status.status.as_ref()),
+                        Some(ballista_core::serde::protobuf::executor_status::Status::Terminating(_))
+                    );
+
+                    let stop_reason = if terminating {
+                        format!(
+                        "TERMINATING executor {executor_id} heartbeat timed out after {termination_grace_period}s"
+                    )
+                    } else {
+                        format!(
+                            "ACTIVE executor {executor_id} heartbeat timed out after {DEFAULT_EXECUTOR_TIMEOUT_SECONDS}s",
+                        )
+                    };
+
+                    warn!("{stop_reason}");
+
+                    // If executor is expired, remove it immediately
                     Self::remove_executor(
                         executor_manager,
                         sender_clone,
                         &executor_id,
                         Some(stop_reason.clone()),
-                    )
-                    .await
-                    .unwrap_or_else(|e| {
-                        let msg =
-                            format!("Error to remove Executor in Scheduler due to {e:?}");
-                        error!("{}", msg);
-                    });
+                        0,
+                    );
 
-                    match state.executor_manager.get_client(&executor_id).await {
-                        Ok(mut client) => {
-                            tokio::task::spawn(async move {
-                                match client
-                                    .stop_executor(StopExecutorParams {
-                                        executor_id,
-                                        reason: stop_reason,
-                                        force: true,
-                                    })
-                                    .await
-                                {
-                                    Err(error) => {
-                                        warn!(
+                    // If executor is not already terminating then stop it. If it is terminating then it should already be shutting
+                    // down and we do not need to do anything here.
+                    if !terminating {
+                        match state.executor_manager.get_client(&executor_id).await {
+                            Ok(mut client) => {
+                                tokio::task::spawn(async move {
+                                    match client
+                                        .stop_executor(StopExecutorParams {
+                                            executor_id,
+                                            reason: stop_reason,
+                                            force: true,
+                                        })
+                                        .await
+                                    {
+                                        Err(error) => {
+                                            warn!(
                                             "Failed to send stop_executor rpc due to, {}",
                                             error
                                         );
+                                        }
+                                        Ok(_value) => {}
                                     }
-                                    Ok(_value) => {}
-                                }
-                            });
-                        }
-                        Err(_) => {
-                            warn!("Executor is already dead, failed to connect to Executor {}", executor_id);
+                                });
+                            }
+                            Err(_) => {
+                                warn!("Executor is already dead, failed to connect to Executor {}", executor_id);
+                            }
                         }
                     }
                 }
-                tokio::time::sleep(Duration::from_secs(DEFAULT_EXECUTOR_TIMEOUT_SECONDS))
-                    .await;
+                tokio::time::sleep(Duration::from_secs(
+                    EXPIRE_DEAD_EXECUTOR_INTERVAL_SECS,
+                ))
+                .await;
             }
         });
         Ok(())
     }
 
-    pub(crate) async fn remove_executor(
+    pub(crate) fn remove_executor(
         executor_manager: ExecutorManager,
         event_sender: EventSender<QueryStageSchedulerEvent>,
         executor_id: &str,
         reason: Option<String>,
-    ) -> Result<()> {
-        // Update the executor manager immediately here
-        executor_manager
-            .remove_executor(executor_id, reason.clone())
-            .await?;
+        wait_secs: u64,
+    ) {
+        let executor_id = executor_id.to_owned();
+        tokio::spawn(async move {

Review Comment:
   I was worried that spawning every time without backpressure from blocking or an idempotency check could cause a storm of `ExecutorLost` events, but it seems like this is handled by:
   
   ```
   self.cluster_state.remove_executor(executor_id).await?;
   ...
   self.executors_heartbeat.remove(&executor_id);
   ...
   self.dead_executors.insert(executor_id);
   ```
   
   ?
   
   It's not always easy to get a picture of the mental swim lane diagram of how all these pieces of state, messages, and events interact.



##########
ballista/executor/src/executor_process.rs:
##########
@@ -319,7 +324,41 @@ pub async fn start_executor_process(opt: ExecutorProcessConfig) -> Result<()> {
         },
     };
 
+    // Set status to fenced
+    info!("setting executor to TERMINATING status");
+    TERMINATING.store(true, Ordering::Release);
+
     if notify_scheduler {
+        // Send a heartbeat to update status of executor to `Fenced`. This should signal to the
+        // scheduler to no longer scheduler tasks on this executor

Review Comment:
   ```suggestion
           // scheduler to no longer schedule tasks on this executor
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org