You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/10/05 01:31:24 UTC
[arrow-ballista] branch master updated: REST API to get query stages (#305)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new aae3f3cc REST API to get query stages (#305)
aae3f3cc is described below
commit aae3f3cce8b1735e79cd4bd2f15fab1483d07ccb
Author: Andy Grove <an...@gmail.com>
AuthorDate: Tue Oct 4 19:31:20 2022 -0600
REST API to get query stages (#305)
---
ballista/rust/scheduler/src/api/handlers.rs | 119 +++++++++++++++++++++++++---
ballista/rust/scheduler/src/api/mod.rs | 7 +-
2 files changed, 111 insertions(+), 15 deletions(-)
diff --git a/ballista/rust/scheduler/src/api/handlers.rs b/ballista/rust/scheduler/src/api/handlers.rs
index 7ed22c34..807d27a5 100644
--- a/ballista/rust/scheduler/src/api/handlers.rs
+++ b/ballista/rust/scheduler/src/api/handlers.rs
@@ -11,14 +11,17 @@
// limitations under the License.
use crate::scheduler_server::SchedulerServer;
+use crate::state::execution_graph::ExecutionStage;
use crate::state::execution_graph_dot::ExecutionGraphDot;
use ballista_core::serde::protobuf::job_status::Status;
use ballista_core::serde::AsExecutionPlan;
use ballista_core::BALLISTA_VERSION;
+use datafusion::physical_plan::metrics::{MetricValue, MetricsSet, Time};
use datafusion_proto::logical_plan::AsLogicalPlan;
use graphviz_rust::cmd::{CommandArg, Format};
use graphviz_rust::exec;
use graphviz_rust::printer::PrinterContext;
+use std::time::Duration;
use warp::Rejection;
#[derive(Debug, serde::Serialize)]
@@ -50,6 +53,15 @@ pub struct JobResponse {
pub percent_complete: u8,
}
+#[derive(Debug, serde::Serialize)]
+pub struct QueryStageSummary {
+ pub stage_id: String,
+ pub stage_status: String,
+ pub input_rows: usize,
+ pub output_rows: usize,
+ pub elapsed_compute: String,
+}
+
/// Return current scheduler state
pub(crate) async fn get_scheduler_state<T: AsLogicalPlan, U: AsExecutionPlan>(
data_server: SchedulerServer<T, U>,
@@ -146,33 +158,116 @@ pub(crate) async fn get_jobs<T: AsLogicalPlan, U: AsExecutionPlan>(
}
#[derive(Debug, serde::Serialize)]
-pub struct JobSummaryResponse {
- /// Just show debug output for now but what we really want here is a list of stages with
- /// plans and metrics and the relationship between them
- pub summary: String,
+pub struct QueryStagesResponse {
+ pub stages: Vec<QueryStageSummary>,
}
/// Get the execution graph for the specified job id
-pub(crate) async fn get_job_summary<T: AsLogicalPlan, U: AsExecutionPlan>(
+pub(crate) async fn get_query_stages<T: AsLogicalPlan, U: AsExecutionPlan>(
data_server: SchedulerServer<T, U>,
job_id: String,
) -> Result<impl warp::Reply, Rejection> {
- let graph = data_server
+ let maybe_graph = data_server
.state
.task_manager
.get_job_execution_graph(&job_id)
.await
.map_err(|_| warp::reject())?;
- match graph {
- Some(x) => Ok(warp::reply::json(&JobSummaryResponse {
- summary: format!("{:?}", x),
- })),
- _ => Ok(warp::reply::json(&JobSummaryResponse {
- summary: "Not Found".to_string(),
+ match maybe_graph {
+ Some(graph) => Ok(warp::reply::json(&QueryStagesResponse {
+ stages: graph
+ .stages()
+ .iter()
+ .map(|(id, stage)| {
+ let mut summary = QueryStageSummary {
+ stage_id: id.to_string(),
+ stage_status: "".to_string(),
+ input_rows: 0,
+ output_rows: 0,
+ elapsed_compute: "".to_string(),
+ };
+ match stage {
+ ExecutionStage::UnResolved(_) => {
+ summary.stage_status = "Unresolved".to_string();
+ }
+ ExecutionStage::Resolved(_) => {
+ summary.stage_status = "Resolved".to_string();
+ }
+ ExecutionStage::Running(running_stage) => {
+ summary.stage_status = "Running".to_string();
+ summary.input_rows = running_stage
+ .stage_metrics
+ .as_ref()
+ .map(|m| get_combined_count(m.as_slice(), "input_rows"))
+ .unwrap_or(0);
+ summary.output_rows = running_stage
+ .stage_metrics
+ .as_ref()
+ .map(|m| get_combined_count(m.as_slice(), "output_rows"))
+ .unwrap_or(0);
+ summary.elapsed_compute = running_stage
+ .stage_metrics
+ .as_ref()
+ .map(|m| get_elapsed_compute_nanos(m.as_slice()))
+ .unwrap_or_default();
+ }
+ ExecutionStage::Successful(completed_stage) => {
+ summary.stage_status = "Completed".to_string();
+ summary.input_rows = get_combined_count(
+ &completed_stage.stage_metrics,
+ "input_rows",
+ );
+ summary.output_rows = get_combined_count(
+ &completed_stage.stage_metrics,
+ "output_rows",
+ );
+ summary.elapsed_compute =
+ get_elapsed_compute_nanos(&completed_stage.stage_metrics);
+ }
+ ExecutionStage::Failed(_) => {
+ summary.stage_status = "Failed".to_string();
+ }
+ }
+ summary
+ })
+ .collect(),
})),
+ _ => Ok(warp::reply::json(&QueryStagesResponse { stages: vec![] })),
}
}
+
+fn get_elapsed_compute_nanos(metrics: &[MetricsSet]) -> String {
+ let nanos: usize = metrics
+ .iter()
+ .flat_map(|vec| {
+ vec.iter().map(|metric| match metric.as_ref().value() {
+ MetricValue::ElapsedCompute(time) => time.value(),
+ _ => 0,
+ })
+ })
+ .sum();
+ let t = Time::new();
+ t.add_duration(Duration::from_nanos(nanos as u64));
+ t.to_string()
+}
+
+fn get_combined_count(metrics: &[MetricsSet], name: &str) -> usize {
+ metrics
+ .iter()
+ .flat_map(|vec| {
+ vec.iter().map(|metric| {
+ let metric_value = metric.value();
+ if metric_value.name() == name {
+ metric_value.as_usize()
+ } else {
+ 0
+ }
+ })
+ })
+ .sum()
+}
+
/// Generate a dot graph for the specified job id and return as plain text
pub(crate) async fn get_job_dot_graph<T: AsLogicalPlan, U: AsExecutionPlan>(
data_server: SchedulerServer<T, U>,
diff --git a/ballista/rust/scheduler/src/api/mod.rs b/ballista/rust/scheduler/src/api/mod.rs
index 42518a47..17eec052 100644
--- a/ballista/rust/scheduler/src/api/mod.rs
+++ b/ballista/rust/scheduler/src/api/mod.rs
@@ -97,13 +97,14 @@ pub fn get_routes<T: AsLogicalPlan + Clone, U: 'static + AsExecutionPlan>(
.and(with_data_server(scheduler_server.clone()))
.and_then(|data_server| handlers::get_jobs(data_server));
- let route_job_summary = warp::path!("api" / "job" / String)
+ let route_query_stages = warp::path!("api" / "job" / String / "stages")
.and(with_data_server(scheduler_server.clone()))
- .and_then(|job_id, data_server| handlers::get_job_summary(data_server, job_id));
+ .and_then(|job_id, data_server| handlers::get_query_stages(data_server, job_id));
let route_job_dot = warp::path!("api" / "job" / String / "dot")
.and(with_data_server(scheduler_server.clone()))
.and_then(|job_id, data_server| handlers::get_job_dot_graph(data_server, job_id));
+
let route_job_dot_svg = warp::path!("api" / "job" / String / "dot_svg")
.and(with_data_server(scheduler_server))
.and_then(|job_id, data_server| handlers::get_job_svg_graph(data_server, job_id));
@@ -111,7 +112,7 @@ pub fn get_routes<T: AsLogicalPlan + Clone, U: 'static + AsExecutionPlan>(
let routes = route_scheduler_state
.or(route_executors)
.or(route_jobs)
- .or(route_job_summary)
+ .or(route_query_stages)
.or(route_job_dot)
.or(route_job_dot_svg);
routes.boxed()