You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2021/04/01 17:49:08 UTC

[spark] branch master updated: [SPARK-26399][WEBUI][CORE] Add new stage-level REST APIs and parameters

This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2796812  [SPARK-26399][WEBUI][CORE] Add new stage-level REST APIs and parameters
2796812 is described below

commit 2796812cea343f700c2009db9ce733b4f048ecd0
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Thu Apr 1 12:48:26 2021 -0500

    [SPARK-26399][WEBUI][CORE] Add new stage-level REST APIs and parameters
    
    ### What changes were proposed in this pull request?
    Add more flexable parameters for stage end point
    endpoint /application/{app-id}/stages.  It can be:
    
    /application/{app-id}/stages?details=[true|false]&status=[ACTIVE|COMPLETE|FAILED|PENDING|SKIPPED]&withSummaries=[true|false]$quantiles=[comma separated quantiles string]&taskStatus=[RUNNING|SUCCESS|FAILED|PENDING]
    
    where
    ```
    query parameter details=true is to show the detailed task information within each stage.  The default value is details=false;
    query parameter status can select those stages with the specified status.  When status parameter is not specified, a list of all stages are generated.  
    query parameter withSummaries=true is to show both task summary information in percentile distribution and executor summary information in percentile distribution.  The default value is withSummaries=false.
    query parameter quantiles support user defined quantiles, default quantiles is `0.0,0.25,0.5,0.75,1.0`
    query parameter taskStatus is to show only those tasks with the specified status within their corresponding stages.  This parameter will be set when details=true (i.e. this parameter will be ignored when details=false).
    ```
    
    ### Why are the changes needed?
    More flexable restful API
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    UT
    
    Closes #31204 from AngersZhuuuu/SPARK-26399-NEW.
    
    Lead-authored-by: Angerszhuuuu <an...@gmail.com>
    Co-authored-by: AngersZhuuuu <an...@gmail.com>
    Signed-off-by: Sean Owen <sr...@gmail.com>
---
 .../org/apache/spark/status/AppStatusStore.scala   | 16 ++++++++---
 .../spark/status/api/v1/StagesResource.scala       | 31 ++++++++++++++++++++--
 docs/monitoring.md                                 |  6 ++++-
 3 files changed, 47 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
index 1d34d8a..8d9c7cc 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
@@ -104,13 +104,23 @@ private[spark] class AppStatusStore(
     listener.map(_.activeStages()).getOrElse(Nil)
   }
 
-  def stageList(statuses: JList[v1.StageStatus]): Seq[v1.StageData] = {
+  def stageList(
+    statuses: JList[v1.StageStatus],
+    details: Boolean = false,
+    withSummaries: Boolean = false,
+    unsortedQuantiles: Array[Double] = Array.empty,
+    taskStatus: JList[v1.TaskStatus] = List().asJava): Seq[v1.StageData] = {
+    val quantiles = unsortedQuantiles.sorted
     val it = store.view(classOf[StageDataWrapper]).reverse().asScala.map(_.info)
-    if (statuses != null && !statuses.isEmpty()) {
+    val ret = if (statuses != null && !statuses.isEmpty()) {
       it.filter { s => statuses.contains(s.status) }.toSeq
     } else {
       it.toSeq
     }
+    ret.map { s =>
+      newStageData(s, withDetail = details, taskStatus = taskStatus,
+        withSummaries = withSummaries, unsortedQuantiles = quantiles)
+    }
   }
 
   def stageData(
@@ -472,7 +482,7 @@ private[spark] class AppStatusStore(
   def newStageData(
     stage: v1.StageData,
     withDetail: Boolean = false,
-    taskStatus: JList[v1.TaskStatus],
+    taskStatus: JList[v1.TaskStatus] = List().asJava,
     withSummaries: Boolean = false,
     unsortedQuantiles: Array[Double] = Array.empty[Double]): v1.StageData = {
     if (!withDetail && !withSummaries) {
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala
index af66a73..26dfa5a 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala
@@ -20,6 +20,9 @@ import java.util.{HashMap, List => JList, Locale}
 import javax.ws.rs.{NotFoundException => _, _}
 import javax.ws.rs.core.{Context, MediaType, MultivaluedMap, UriInfo}
 
+import scala.collection.JavaConverters._
+
+import org.apache.spark.status.api.v1.TaskStatus._
 import org.apache.spark.ui.UIUtils
 import org.apache.spark.ui.jobs.ApiHelper._
 import org.apache.spark.util.Utils
@@ -28,8 +31,32 @@ import org.apache.spark.util.Utils
 private[v1] class StagesResource extends BaseAppResource {
 
   @GET
-  def stageList(@QueryParam("status") statuses: JList[StageStatus]): Seq[StageData] = {
-    withUI(_.store.stageList(statuses))
+  def stageList(
+      @QueryParam("status") statuses: JList[StageStatus],
+      @QueryParam("details") @DefaultValue("false") details: Boolean,
+      @QueryParam("withSummaries") @DefaultValue("false") withSummaries: Boolean,
+      @QueryParam("quantiles") @DefaultValue("0.0,0.25,0.5,0.75,1.0") quantileString: String,
+      @QueryParam("taskStatus") taskStatus: JList[TaskStatus]): Seq[StageData] = {
+    withUI {
+      val quantiles = parseQuantileString(quantileString)
+      ui => {
+        ui.store.stageList(statuses, details, withSummaries, quantiles, taskStatus)
+          .filter { stage =>
+            if (details && taskStatus.asScala.nonEmpty) {
+              taskStatus.asScala.exists {
+                case FAILED => stage.numFailedTasks > 0
+                case KILLED => stage.numKilledTasks > 0
+                case RUNNING => stage.numActiveTasks > 0
+                case SUCCESS => stage.numCompleteTasks > 0
+                case UNKNOWN => stage.numTasks - stage.numFailedTasks - stage.numKilledTasks -
+                  stage.numActiveTasks - stage.numCompleteTasks > 0
+              }
+            } else {
+              true
+            }
+          }
+      }
+    }
   }
 
   @GET
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 6157244..14c8c4f 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -472,7 +472,11 @@ can be identified by their `[attempt-id]`. In the API listed below, when running
     <td><code>/applications/[app-id]/stages</code></td>
     <td>
       A list of all stages for a given application.
-      <br><code>?status=[active|complete|pending|failed]</code> list only stages in the state.
+        <br><code>?status=[active|complete|pending|failed]</code> list only stages in the given state.
+        <br><code>?details=true</code> lists all stages with the task data.
+        <br><code>?taskStatus=[RUNNING|SUCCESS|FAILED|KILLED|PENDING]</code> lists stages only those tasks with the specified task status. Query parameter taskStatus takes effect only when <code>details=true</code>.
+        <br><code>?withSummaries=true</code> lists stages with task metrics distribution and executor metrics distribution.
+        <br><code>?quantiles=0.0,0.25,0.5,0.75,1.0</code> summarize the metrics with the given quantiles. Query parameter quantiles takes effect only when <code>withSummaries=true</code>. Default value is <code>0.0,0.25,0.5,0.75,1.0</code>. 
     </td>
   </tr>
   <tr>

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org