You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2021/04/01 17:49:08 UTC
[spark] branch master updated: [SPARK-26399][WEBUI][CORE] Add new
stage-level REST APIs and parameters
This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 2796812 [SPARK-26399][WEBUI][CORE] Add new stage-level REST APIs and parameters
2796812 is described below
commit 2796812cea343f700c2009db9ce733b4f048ecd0
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Thu Apr 1 12:48:26 2021 -0500
[SPARK-26399][WEBUI][CORE] Add new stage-level REST APIs and parameters
### What changes were proposed in this pull request?
Add more flexable parameters for stage end point
endpoint /application/{app-id}/stages. It can be:
/application/{app-id}/stages?details=[true|false]&status=[ACTIVE|COMPLETE|FAILED|PENDING|SKIPPED]&withSummaries=[true|false]$quantiles=[comma separated quantiles string]&taskStatus=[RUNNING|SUCCESS|FAILED|PENDING]
where
```
query parameter details=true is to show the detailed task information within each stage. The default value is details=false;
query parameter status can select those stages with the specified status. When status parameter is not specified, a list of all stages are generated.
query parameter withSummaries=true is to show both task summary information in percentile distribution and executor summary information in percentile distribution. The default value is withSummaries=false.
query parameter quantiles support user defined quantiles, default quantiles is `0.0,0.25,0.5,0.75,1.0`
query parameter taskStatus is to show only those tasks with the specified status within their corresponding stages. This parameter will be set when details=true (i.e. this parameter will be ignored when details=false).
```
### Why are the changes needed?
More flexable restful API
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
UT
Closes #31204 from AngersZhuuuu/SPARK-26399-NEW.
Lead-authored-by: Angerszhuuuu <an...@gmail.com>
Co-authored-by: AngersZhuuuu <an...@gmail.com>
Signed-off-by: Sean Owen <sr...@gmail.com>
---
.../org/apache/spark/status/AppStatusStore.scala | 16 ++++++++---
.../spark/status/api/v1/StagesResource.scala | 31 ++++++++++++++++++++--
docs/monitoring.md | 6 ++++-
3 files changed, 47 insertions(+), 6 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
index 1d34d8a..8d9c7cc 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
@@ -104,13 +104,23 @@ private[spark] class AppStatusStore(
listener.map(_.activeStages()).getOrElse(Nil)
}
- def stageList(statuses: JList[v1.StageStatus]): Seq[v1.StageData] = {
+ def stageList(
+ statuses: JList[v1.StageStatus],
+ details: Boolean = false,
+ withSummaries: Boolean = false,
+ unsortedQuantiles: Array[Double] = Array.empty,
+ taskStatus: JList[v1.TaskStatus] = List().asJava): Seq[v1.StageData] = {
+ val quantiles = unsortedQuantiles.sorted
val it = store.view(classOf[StageDataWrapper]).reverse().asScala.map(_.info)
- if (statuses != null && !statuses.isEmpty()) {
+ val ret = if (statuses != null && !statuses.isEmpty()) {
it.filter { s => statuses.contains(s.status) }.toSeq
} else {
it.toSeq
}
+ ret.map { s =>
+ newStageData(s, withDetail = details, taskStatus = taskStatus,
+ withSummaries = withSummaries, unsortedQuantiles = quantiles)
+ }
}
def stageData(
@@ -472,7 +482,7 @@ private[spark] class AppStatusStore(
def newStageData(
stage: v1.StageData,
withDetail: Boolean = false,
- taskStatus: JList[v1.TaskStatus],
+ taskStatus: JList[v1.TaskStatus] = List().asJava,
withSummaries: Boolean = false,
unsortedQuantiles: Array[Double] = Array.empty[Double]): v1.StageData = {
if (!withDetail && !withSummaries) {
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala
index af66a73..26dfa5a 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala
@@ -20,6 +20,9 @@ import java.util.{HashMap, List => JList, Locale}
import javax.ws.rs.{NotFoundException => _, _}
import javax.ws.rs.core.{Context, MediaType, MultivaluedMap, UriInfo}
+import scala.collection.JavaConverters._
+
+import org.apache.spark.status.api.v1.TaskStatus._
import org.apache.spark.ui.UIUtils
import org.apache.spark.ui.jobs.ApiHelper._
import org.apache.spark.util.Utils
@@ -28,8 +31,32 @@ import org.apache.spark.util.Utils
private[v1] class StagesResource extends BaseAppResource {
@GET
- def stageList(@QueryParam("status") statuses: JList[StageStatus]): Seq[StageData] = {
- withUI(_.store.stageList(statuses))
+ def stageList(
+ @QueryParam("status") statuses: JList[StageStatus],
+ @QueryParam("details") @DefaultValue("false") details: Boolean,
+ @QueryParam("withSummaries") @DefaultValue("false") withSummaries: Boolean,
+ @QueryParam("quantiles") @DefaultValue("0.0,0.25,0.5,0.75,1.0") quantileString: String,
+ @QueryParam("taskStatus") taskStatus: JList[TaskStatus]): Seq[StageData] = {
+ withUI {
+ val quantiles = parseQuantileString(quantileString)
+ ui => {
+ ui.store.stageList(statuses, details, withSummaries, quantiles, taskStatus)
+ .filter { stage =>
+ if (details && taskStatus.asScala.nonEmpty) {
+ taskStatus.asScala.exists {
+ case FAILED => stage.numFailedTasks > 0
+ case KILLED => stage.numKilledTasks > 0
+ case RUNNING => stage.numActiveTasks > 0
+ case SUCCESS => stage.numCompleteTasks > 0
+ case UNKNOWN => stage.numTasks - stage.numFailedTasks - stage.numKilledTasks -
+ stage.numActiveTasks - stage.numCompleteTasks > 0
+ }
+ } else {
+ true
+ }
+ }
+ }
+ }
}
@GET
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 6157244..14c8c4f 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -472,7 +472,11 @@ can be identified by their `[attempt-id]`. In the API listed below, when running
<td><code>/applications/[app-id]/stages</code></td>
<td>
A list of all stages for a given application.
- <br><code>?status=[active|complete|pending|failed]</code> list only stages in the state.
+ <br><code>?status=[active|complete|pending|failed]</code> list only stages in the given state.
+ <br><code>?details=true</code> lists all stages with the task data.
+ <br><code>?taskStatus=[RUNNING|SUCCESS|FAILED|KILLED|PENDING]</code> lists stages only those tasks with the specified task status. Query parameter taskStatus takes effect only when <code>details=true</code>.
+ <br><code>?withSummaries=true</code> lists stages with task metrics distribution and executor metrics distribution.
+ <br><code>?quantiles=0.0,0.25,0.5,0.75,1.0</code> summarize the metrics with the given quantiles. Query parameter quantiles takes effect only when <code>withSummaries=true</code>. Default value is <code>0.0,0.25,0.5,0.75,1.0</code>.
</td>
</tr>
<tr>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org