You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by th...@apache.org on 2023/01/05 17:43:06 UTC
[arrow-ballista] 01/01: Handle job resubmission
This is an automated email from the ASF dual-hosted git repository.
thinkharderdev pushed a commit to branch issue-585
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
commit 94a1716a20852a735ab74f13107453bcf33b1364
Author: Dan Harris <da...@thinkharder.dev>
AuthorDate: Thu Jan 5 12:42:50 2023 -0500
Handle job resubmission
---
ballista/scheduler/src/scheduler_server/event.rs | 1 +
.../src/scheduler_server/query_stage_scheduler.rs | 60 +++++++++++++++++-----
2 files changed, 48 insertions(+), 13 deletions(-)
diff --git a/ballista/scheduler/src/scheduler_server/event.rs b/ballista/scheduler/src/scheduler_server/event.rs
index 74643535..21ce7fbb 100644
--- a/ballista/scheduler/src/scheduler_server/event.rs
+++ b/ballista/scheduler/src/scheduler_server/event.rs
@@ -37,6 +37,7 @@ pub enum QueryStageSchedulerEvent {
job_id: String,
queued_at: u64,
submitted_at: u64,
+ resubmit: bool,
},
// For a job which failed during planning
JobPlanningFailed {
diff --git a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
index 29f3d2d7..89202fe8 100644
--- a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -17,6 +17,7 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
+use std::time::Duration;
use async_trait::async_trait;
use log::{debug, error, info};
@@ -120,6 +121,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
job_id,
queued_at,
submitted_at: timestamp_millis(),
+ resubmit: false,
}
};
tx_event
@@ -133,11 +135,20 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
job_id,
queued_at,
submitted_at,
+ resubmit,
} => {
- self.metrics_collector
- .record_submitted(&job_id, queued_at, submitted_at);
+ if !resubmit {
+ self.metrics_collector.record_submitted(
+ &job_id,
+ queued_at,
+ submitted_at,
+ );
+
+ info!("Job {} submitted", job_id);
+ } else {
+ debug!("Job {} resubmitted", job_id);
+ }
- info!("Job {} submitted", job_id);
if self.state.config.is_push_staged_scheduling() {
let available_tasks = self
.state
@@ -154,17 +165,40 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
.map(|res| res.assign(job_id.clone()))
.collect();
- debug!(
- "Reserved {} task slots for submitted job {}",
- reservations.len(),
- job_id
- );
+ if reservations.is_empty() {
+ debug!(
+ "No task slots reserved for job {}, resubmitting after 200ms",
+ job_id
+ );
- tx_event
- .post_event(QueryStageSchedulerEvent::ReservationOffering(
- reservations,
- ))
- .await?;
+ tokio::task::spawn(async move {
+ tokio::time::sleep(Duration::from_millis(200)).await;
+
+ if let Err(e) = tx_event
+ .post_event(QueryStageSchedulerEvent::JobSubmitted {
+ job_id,
+ queued_at,
+ submitted_at,
+ resubmit: true,
+ })
+ .await
+ {
+ error!("error resubmitting job: {}", e);
+ }
+ });
+ } else {
+ debug!(
+ "Reserved {} task slots for submitted job {}",
+ reservations.len(),
+ job_id
+ );
+
+ tx_event
+ .post_event(QueryStageSchedulerEvent::ReservationOffering(
+ reservations,
+ ))
+ .await?;
+ }
}
}
QueryStageSchedulerEvent::JobPlanningFailed {