You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/10/05 01:31:24 UTC

[arrow-ballista] branch master updated: REST API to get query stages (#305)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new aae3f3cc REST API to get query stages (#305)
aae3f3cc is described below

commit aae3f3cce8b1735e79cd4bd2f15fab1483d07ccb
Author: Andy Grove <an...@gmail.com>
AuthorDate: Tue Oct 4 19:31:20 2022 -0600

    REST API to get query stages (#305)
---
 ballista/rust/scheduler/src/api/handlers.rs | 119 +++++++++++++++++++++++++---
 ballista/rust/scheduler/src/api/mod.rs      |   7 +-
 2 files changed, 111 insertions(+), 15 deletions(-)

diff --git a/ballista/rust/scheduler/src/api/handlers.rs b/ballista/rust/scheduler/src/api/handlers.rs
index 7ed22c34..807d27a5 100644
--- a/ballista/rust/scheduler/src/api/handlers.rs
+++ b/ballista/rust/scheduler/src/api/handlers.rs
@@ -11,14 +11,17 @@
 // limitations under the License.
 
 use crate::scheduler_server::SchedulerServer;
+use crate::state::execution_graph::ExecutionStage;
 use crate::state::execution_graph_dot::ExecutionGraphDot;
 use ballista_core::serde::protobuf::job_status::Status;
 use ballista_core::serde::AsExecutionPlan;
 use ballista_core::BALLISTA_VERSION;
+use datafusion::physical_plan::metrics::{MetricValue, MetricsSet, Time};
 use datafusion_proto::logical_plan::AsLogicalPlan;
 use graphviz_rust::cmd::{CommandArg, Format};
 use graphviz_rust::exec;
 use graphviz_rust::printer::PrinterContext;
+use std::time::Duration;
 use warp::Rejection;
 
 #[derive(Debug, serde::Serialize)]
@@ -50,6 +53,15 @@ pub struct JobResponse {
     pub percent_complete: u8,
 }
 
+#[derive(Debug, serde::Serialize)]
+pub struct QueryStageSummary {
+    pub stage_id: String,
+    pub stage_status: String,
+    pub input_rows: usize,
+    pub output_rows: usize,
+    pub elapsed_compute: String,
+}
+
 /// Return current scheduler state
 pub(crate) async fn get_scheduler_state<T: AsLogicalPlan, U: AsExecutionPlan>(
     data_server: SchedulerServer<T, U>,
@@ -146,33 +158,116 @@ pub(crate) async fn get_jobs<T: AsLogicalPlan, U: AsExecutionPlan>(
 }
 
 #[derive(Debug, serde::Serialize)]
-pub struct JobSummaryResponse {
-    /// Just show debug output for now but what we really want here is a list of stages with
-    /// plans and metrics and the relationship between them
-    pub summary: String,
+pub struct QueryStagesResponse {
+    pub stages: Vec<QueryStageSummary>,
 }
 
 /// Get the execution graph for the specified job id
-pub(crate) async fn get_job_summary<T: AsLogicalPlan, U: AsExecutionPlan>(
+pub(crate) async fn get_query_stages<T: AsLogicalPlan, U: AsExecutionPlan>(
     data_server: SchedulerServer<T, U>,
     job_id: String,
 ) -> Result<impl warp::Reply, Rejection> {
-    let graph = data_server
+    let maybe_graph = data_server
         .state
         .task_manager
         .get_job_execution_graph(&job_id)
         .await
         .map_err(|_| warp::reject())?;
 
-    match graph {
-        Some(x) => Ok(warp::reply::json(&JobSummaryResponse {
-            summary: format!("{:?}", x),
-        })),
-        _ => Ok(warp::reply::json(&JobSummaryResponse {
-            summary: "Not Found".to_string(),
+    match maybe_graph {
+        Some(graph) => Ok(warp::reply::json(&QueryStagesResponse {
+            stages: graph
+                .stages()
+                .iter()
+                .map(|(id, stage)| {
+                    let mut summary = QueryStageSummary {
+                        stage_id: id.to_string(),
+                        stage_status: "".to_string(),
+                        input_rows: 0,
+                        output_rows: 0,
+                        elapsed_compute: "".to_string(),
+                    };
+                    match stage {
+                        ExecutionStage::UnResolved(_) => {
+                            summary.stage_status = "Unresolved".to_string();
+                        }
+                        ExecutionStage::Resolved(_) => {
+                            summary.stage_status = "Resolved".to_string();
+                        }
+                        ExecutionStage::Running(running_stage) => {
+                            summary.stage_status = "Running".to_string();
+                            summary.input_rows = running_stage
+                                .stage_metrics
+                                .as_ref()
+                                .map(|m| get_combined_count(m.as_slice(), "input_rows"))
+                                .unwrap_or(0);
+                            summary.output_rows = running_stage
+                                .stage_metrics
+                                .as_ref()
+                                .map(|m| get_combined_count(m.as_slice(), "output_rows"))
+                                .unwrap_or(0);
+                            summary.elapsed_compute = running_stage
+                                .stage_metrics
+                                .as_ref()
+                                .map(|m| get_elapsed_compute_nanos(m.as_slice()))
+                                .unwrap_or_default();
+                        }
+                        ExecutionStage::Successful(completed_stage) => {
+                            summary.stage_status = "Completed".to_string();
+                            summary.input_rows = get_combined_count(
+                                &completed_stage.stage_metrics,
+                                "input_rows",
+                            );
+                            summary.output_rows = get_combined_count(
+                                &completed_stage.stage_metrics,
+                                "output_rows",
+                            );
+                            summary.elapsed_compute =
+                                get_elapsed_compute_nanos(&completed_stage.stage_metrics);
+                        }
+                        ExecutionStage::Failed(_) => {
+                            summary.stage_status = "Failed".to_string();
+                        }
+                    }
+                    summary
+                })
+                .collect(),
         })),
+        _ => Ok(warp::reply::json(&QueryStagesResponse { stages: vec![] })),
     }
 }
+
+fn get_elapsed_compute_nanos(metrics: &[MetricsSet]) -> String {
+    let nanos: usize = metrics
+        .iter()
+        .flat_map(|vec| {
+            vec.iter().map(|metric| match metric.as_ref().value() {
+                MetricValue::ElapsedCompute(time) => time.value(),
+                _ => 0,
+            })
+        })
+        .sum();
+    let t = Time::new();
+    t.add_duration(Duration::from_nanos(nanos as u64));
+    t.to_string()
+}
+
+fn get_combined_count(metrics: &[MetricsSet], name: &str) -> usize {
+    metrics
+        .iter()
+        .flat_map(|vec| {
+            vec.iter().map(|metric| {
+                let metric_value = metric.value();
+                if metric_value.name() == name {
+                    metric_value.as_usize()
+                } else {
+                    0
+                }
+            })
+        })
+        .sum()
+}
+
 /// Generate a dot graph for the specified job id and return as plain text
 pub(crate) async fn get_job_dot_graph<T: AsLogicalPlan, U: AsExecutionPlan>(
     data_server: SchedulerServer<T, U>,
diff --git a/ballista/rust/scheduler/src/api/mod.rs b/ballista/rust/scheduler/src/api/mod.rs
index 42518a47..17eec052 100644
--- a/ballista/rust/scheduler/src/api/mod.rs
+++ b/ballista/rust/scheduler/src/api/mod.rs
@@ -97,13 +97,14 @@ pub fn get_routes<T: AsLogicalPlan + Clone, U: 'static + AsExecutionPlan>(
         .and(with_data_server(scheduler_server.clone()))
         .and_then(|data_server| handlers::get_jobs(data_server));
 
-    let route_job_summary = warp::path!("api" / "job" / String)
+    let route_query_stages = warp::path!("api" / "job" / String / "stages")
         .and(with_data_server(scheduler_server.clone()))
-        .and_then(|job_id, data_server| handlers::get_job_summary(data_server, job_id));
+        .and_then(|job_id, data_server| handlers::get_query_stages(data_server, job_id));
 
     let route_job_dot = warp::path!("api" / "job" / String / "dot")
         .and(with_data_server(scheduler_server.clone()))
         .and_then(|job_id, data_server| handlers::get_job_dot_graph(data_server, job_id));
+
     let route_job_dot_svg = warp::path!("api" / "job" / String / "dot_svg")
         .and(with_data_server(scheduler_server))
         .and_then(|job_id, data_server| handlers::get_job_svg_graph(data_server, job_id));
@@ -111,7 +112,7 @@ pub fn get_routes<T: AsLogicalPlan + Clone, U: 'static + AsExecutionPlan>(
     let routes = route_scheduler_state
         .or(route_executors)
         .or(route_jobs)
-        .or(route_job_summary)
+        .or(route_query_stages)
         .or(route_job_dot)
         .or(route_job_dot_svg);
     routes.boxed()