You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by th...@apache.org on 2023/01/05 17:43:06 UTC

[arrow-ballista] 01/01: Handle job resubmission

This is an automated email from the ASF dual-hosted git repository.

thinkharderdev pushed a commit to branch issue-585
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git

commit 94a1716a20852a735ab74f13107453bcf33b1364
Author: Dan Harris <da...@thinkharder.dev>
AuthorDate: Thu Jan 5 12:42:50 2023 -0500

    Handle job resubmission
---
 ballista/scheduler/src/scheduler_server/event.rs   |  1 +
 .../src/scheduler_server/query_stage_scheduler.rs  | 60 +++++++++++++++++-----
 2 files changed, 48 insertions(+), 13 deletions(-)

diff --git a/ballista/scheduler/src/scheduler_server/event.rs b/ballista/scheduler/src/scheduler_server/event.rs
index 74643535..21ce7fbb 100644
--- a/ballista/scheduler/src/scheduler_server/event.rs
+++ b/ballista/scheduler/src/scheduler_server/event.rs
@@ -37,6 +37,7 @@ pub enum QueryStageSchedulerEvent {
         job_id: String,
         queued_at: u64,
         submitted_at: u64,
+        resubmit: bool,
     },
     // For a job which failed during planning
     JobPlanningFailed {
diff --git a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
index 29f3d2d7..89202fe8 100644
--- a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -17,6 +17,7 @@
 
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
+use std::time::Duration;
 
 use async_trait::async_trait;
 use log::{debug, error, info};
@@ -120,6 +121,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
                             job_id,
                             queued_at,
                             submitted_at: timestamp_millis(),
+                            resubmit: false,
                         }
                     };
                     tx_event
@@ -133,11 +135,20 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
                 job_id,
                 queued_at,
                 submitted_at,
+                resubmit,
             } => {
-                self.metrics_collector
-                    .record_submitted(&job_id, queued_at, submitted_at);
+                if !resubmit {
+                    self.metrics_collector.record_submitted(
+                        &job_id,
+                        queued_at,
+                        submitted_at,
+                    );
+
+                    info!("Job {} submitted", job_id);
+                } else {
+                    debug!("Job {} resubmitted", job_id);
+                }
 
-                info!("Job {} submitted", job_id);
                 if self.state.config.is_push_staged_scheduling() {
                     let available_tasks = self
                         .state
@@ -154,17 +165,40 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
                         .map(|res| res.assign(job_id.clone()))
                         .collect();
 
-                    debug!(
-                        "Reserved {} task slots for submitted job {}",
-                        reservations.len(),
-                        job_id
-                    );
+                    if reservations.is_empty() {
+                        debug!(
+                            "No task slots reserved for job {}, resubmitting after 200ms",
+                            job_id
+                        );
 
-                    tx_event
-                        .post_event(QueryStageSchedulerEvent::ReservationOffering(
-                            reservations,
-                        ))
-                        .await?;
+                        tokio::task::spawn(async move {
+                            tokio::time::sleep(Duration::from_millis(200)).await;
+
+                            if let Err(e) = tx_event
+                                .post_event(QueryStageSchedulerEvent::JobSubmitted {
+                                    job_id,
+                                    queued_at,
+                                    submitted_at,
+                                    resubmit: true,
+                                })
+                                .await
+                            {
+                                error!("error resubmitting job: {}", e);
+                            }
+                        });
+                    } else {
+                        debug!(
+                            "Reserved {} task slots for submitted job {}",
+                            reservations.len(),
+                            job_id
+                        );
+
+                        tx_event
+                            .post_event(QueryStageSchedulerEvent::ReservationOffering(
+                                reservations,
+                            ))
+                            .await?;
+                    }
                 }
             }
             QueryStageSchedulerEvent::JobPlanningFailed {