You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ir...@apache.org on 2015/05/05 14:26:10 UTC
[10/10] spark git commit: [SPARK-3454] separate json endpoints for
data in the UI
[SPARK-3454] separate json endpoints for data in the UI
Exposes data available in the UI as json over http. Key points:
* new endpoints, handled independently of existing XyzPage classes. Root entrypoint is `JsonRootResource`
* Uses jersey + jackson for routing & converting POJOs into json
* tests against known results in `HistoryServerSuite`
* also fixes some minor issues w/ the UI -- synchronizing on access to `StorageListener` & `StorageStatusListener`, and fixing some inconsistencies w/ the way we handle retained jobs & stages.
Author: Imran Rashid <ir...@cloudera.com>
Closes #4435 from squito/SPARK-3454 and squashes the following commits:
da1e35f [Imran Rashid] typos etc.
5e78b4f [Imran Rashid] fix rendering problems
5ae02ad [Imran Rashid] Merge branch 'master' into SPARK-3454
f016182 [Imran Rashid] change all constructors json-pojo class constructors to be private[spark] to protect us from mima-false-positives if we add fields
3347b72 [Imran Rashid] mark EnumUtil as @Private
ec140a2 [Imran Rashid] create @Private
cc1febf [Imran Rashid] add docs on the metrics-as-json api
cbaf287 [Imran Rashid] Merge branch 'master' into SPARK-3454
56db31e [Imran Rashid] update tests for mulit-attempt
7f3bc4e [Imran Rashid] Revert "add sbt-revolved plugin, to make it easier to start & stop http servers in sbt"
67008b4 [Imran Rashid] rats
9e51400 [Imran Rashid] style
c9bae1c [Imran Rashid] handle multiple attempts per app
b87cd63 [Imran Rashid] add sbt-revolved plugin, to make it easier to start & stop http servers in sbt
188762c [Imran Rashid] multi-attempt
2af11e5 [Imran Rashid] Merge branch 'master' into SPARK-3454
befff0c [Imran Rashid] review feedback
14ac3ed [Imran Rashid] jersey-core needs to be explicit; move version & scope to parent pom.xml
f90680e [Imran Rashid] Merge branch 'master' into SPARK-3454
dc8a7fe [Imran Rashid] style, fix errant comments
acb7ef6 [Imran Rashid] fix indentation
7bf1811 [Imran Rashid] move MetricHelper so mima doesnt think its exposed; comments
9d889d6 [Imran Rashid] undo some unnecessary changes
f48a7b0 [Imran Rashid] docs
52bbae8 [Imran Rashid] StorageListener & StorageStatusListener needs to synchronize internally to be thread-safe
31c79ce [Imran Rashid] asm no longer needed for SPARK_PREPEND_CLASSES
b2f8b91 [Imran Rashid] @DeveloperApi
2e19be2 [Imran Rashid] lazily convert ApplicationInfo to avoid memory overhead
ba3d9d2 [Imran Rashid] upper case enums
39ac29c [Imran Rashid] move EnumUtil
d2bde77 [Imran Rashid] update error handling & scoping
4a234d3 [Imran Rashid] avoid jersey-media-json-jackson b/c of potential version conflicts
a157a2f [Imran Rashid] style
7bd4d15 [Imran Rashid] delete security test, since it doesnt do anything
a325563 [Imran Rashid] style
a9c5cf1 [Imran Rashid] undo changes superceeded by master
0c6f968 [Imran Rashid] update deps
1ed0d07 [Imran Rashid] Merge branch 'master' into SPARK-3454
4c92af6 [Imran Rashid] style
f2e63ad [Imran Rashid] Merge branch 'master' into SPARK-3454
c22b11f [Imran Rashid] fix compile error
9ea682c [Imran Rashid] go back to good ol' java enums
cf86175 [Imran Rashid] style
d493b38 [Imran Rashid] Merge branch 'master' into SPARK-3454
f05ae89 [Imran Rashid] add in ExecutorSummaryInfo for MiMa :(
101a698 [Imran Rashid] style
d2ef58d [Imran Rashid] revert changes that had HistoryServer refresh the application listing more often
b136e39b [Imran Rashid] Revert "add sbt-revolved plugin, to make it easier to start & stop http servers in sbt"
e031719 [Imran Rashid] fixes from review
1f53a66 [Imran Rashid] style
b4a7863 [Imran Rashid] fix compile error
2c8b7ee [Imran Rashid] rats
1578a4a [Imran Rashid] doc
674f8dc [Imran Rashid] more explicit about total numbers of jobs & stages vs. number retained
9922be0 [Imran Rashid] Merge branch 'master' into stage_distributions
f5a5196 [Imran Rashid] undo removal of renderJson from MasterPage, since there is no substitute yet
db61211 [Imran Rashid] get JobProgressListener directly from UI
fdfc181 [Imran Rashid] stage/taskList
63eb4a6 [Imran Rashid] tests for taskSummary
ad27de8 [Imran Rashid] error handling on quantile values
b2efcaf [Imran Rashid] cleanup, combine stage-related paths into one resource
aaba896 [Imran Rashid] wire up task summary
a4b1397 [Imran Rashid] stage metric distributions
e48ba32 [Imran Rashid] rename
eaf3bbb [Imran Rashid] style
25cd894 [Imran Rashid] if only given day, assume GMT
51eaedb [Imran Rashid] more visibility fixes
9f28b7e [Imran Rashid] ack, more cleanup
99764e1 [Imran Rashid] Merge branch 'SPARK-3454_w_jersey' into SPARK-3454
a61a43c [Imran Rashid] oops, remove accidental checkin
a066055 [Imran Rashid] set visibility on a lot of classes
1f361c8 [Imran Rashid] update rat-excludes
0be5120 [Imran Rashid] Merge branch 'master' into SPARK-3454_w_jersey
2382bef [Imran Rashid] switch to using new "enum"
fef6605 [Imran Rashid] some utils for working w/ new "enum" format
dbfc7bf [Imran Rashid] style
b86bcb0 [Imran Rashid] update test to look at one stage attempt
5f9df24 [Imran Rashid] style
7fd156a [Imran Rashid] refactor jsonDiff to avoid code duplication
73f1378 [Imran Rashid] test json; also add test cases for cleaned stages & jobs
97d411f [Imran Rashid] json endpoint for one job
0c96147 [Imran Rashid] better error msgs for bad stageId vs bad attemptId
dddbd29 [Imran Rashid] stages have attempt; jobs are sorted; resource for all attempts for one stage
190c17a [Imran Rashid] StagePage should distinguish no task data, from unknown stage
84cd497 [Imran Rashid] AllJobsPage should still report correct completed & failed job count, even if some have been cleaned, to make it consistent w/ AllStagesPage
36e4062 [Imran Rashid] SparkUI needs to know about startTime, so it can list its own applicationInfo
b4c75ed [Imran Rashid] fix merge conflicts; need to widen visibility in a few cases
e91750a [Imran Rashid] Merge branch 'master' into SPARK-3454_w_jersey
56d2fc7 [Imran Rashid] jersey needs asm for SPARK_PREPEND_CLASSES to work
f7df095 [Imran Rashid] add test for accumulables, and discover that I need update after all
9c0c125 [Imran Rashid] add accumulableInfo
00e9cc5 [Imran Rashid] more style
3377e61 [Imran Rashid] scaladoc
d05f7a9 [Imran Rashid] dont use case classes for status api POJOs, since they have binary compatibility issues
654cecf [Imran Rashid] move all the status api POJOs to one file
b86e2b0 [Imran Rashid] style
18a8c45 [Imran Rashid] Merge branch 'master' into SPARK-3454_w_jersey
5598f19 [Imran Rashid] delete some unnecessary code, more to go
56edce0 [Imran Rashid] style
017c755 [Imran Rashid] add in metrics now available
1b78cb7 [Imran Rashid] fix some import ordering
0dc3ea7 [Imran Rashid] if app isnt found, reload apps from FS before giving up
c7d884f [Imran Rashid] fix merge conflicts
0c12b50 [Imran Rashid] Merge branch 'master' into SPARK-3454_w_jersey
b6a96a8 [Imran Rashid] compare json by AST, not string
cd37845 [Imran Rashid] switch to using java.util.Dates for times
a4ab5aa [Imran Rashid] add in explicit dependency on jersey 1.9 -- maven wasn't happy before this
4fdc39f [Imran Rashid] refactor case insensitive enum parsing
cba1ef6 [Imran Rashid] add security (maybe?) for metrics json
f0264a7 [Imran Rashid] switch to using jersey for metrics json
bceb3a9 [Imran Rashid] set http response code on error, some testing
e0356b6 [Imran Rashid] put new test expectation files in rat excludes (is this OK?)
b252e7a [Imran Rashid] small cleanup of accidental changes
d1a8c92 [Imran Rashid] add sbt-revolved plugin, to make it easier to start & stop http servers in sbt
4b398d0 [Imran Rashid] expose UI data as json in new endpoints
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d4973580
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d4973580
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d4973580
Branch: refs/heads/master
Commit: d49735800db27239c11478aac4b0f2ec9df91a3f
Parents: 51f4620
Author: Imran Rashid <ir...@cloudera.com>
Authored: Tue May 5 07:25:40 2015 -0500
Committer: Imran Rashid <ir...@cloudera.com>
Committed: Tue May 5 07:25:40 2015 -0500
----------------------------------------------------------------------
.rat-excludes | 7 +
core/pom.xml | 8 +
.../org/apache/spark/JobExecutionStatus.java | 8 +-
.../spark/status/api/v1/ApplicationStatus.java | 30 +
.../apache/spark/status/api/v1/StageStatus.java | 31 +
.../apache/spark/status/api/v1/TaskSorting.java | 48 +
.../java/org/apache/spark/util/EnumUtil.java | 38 +
.../scala/org/apache/spark/SparkContext.scala | 2 +-
.../org/apache/spark/annotation/Private.java | 41 +
.../history/ApplicationHistoryProvider.scala | 4 +-
.../deploy/history/FsHistoryProvider.scala | 14 +-
.../spark/deploy/history/HistoryServer.scala | 20 +-
.../spark/deploy/master/ApplicationInfo.scala | 2 +-
.../org/apache/spark/deploy/master/Master.scala | 14 +-
.../deploy/master/ui/ApplicationPage.scala | 19 +-
.../spark/deploy/master/ui/MasterPage.scala | 12 +-
.../spark/deploy/master/ui/MasterWebUI.scala | 24 +-
.../spark/status/api/v1/AllJobsResource.scala | 98 +
.../spark/status/api/v1/AllRDDResource.scala | 104 +
.../spark/status/api/v1/AllStagesResource.scala | 309 +
.../status/api/v1/ApplicationListResource.scala | 94 +
.../status/api/v1/ExecutorListResource.scala | 36 +
.../status/api/v1/JacksonMessageWriter.scala | 93 +
.../spark/status/api/v1/JsonRootResource.scala | 255 +
.../status/api/v1/OneApplicationResource.scala | 31 +
.../spark/status/api/v1/OneJobResource.scala | 41 +
.../spark/status/api/v1/OneRDDResource.scala | 34 +
.../spark/status/api/v1/OneStageResource.scala | 150 +
.../spark/status/api/v1/SecurityFilter.scala | 38 +
.../spark/status/api/v1/SimpleDateParam.scala | 55 +
.../org/apache/spark/status/api/v1/api.scala | 228 +
.../spark/storage/StorageStatusListener.scala | 6 +-
.../scala/org/apache/spark/ui/SparkUI.scala | 49 +-
.../main/scala/org/apache/spark/ui/WebUI.scala | 8 +-
.../apache/spark/ui/exec/ExecutorsPage.scala | 17 +-
.../org/apache/spark/ui/jobs/AllJobsPage.scala | 14 +-
.../apache/spark/ui/jobs/AllStagesPage.scala | 12 +-
.../org/apache/spark/ui/jobs/JobPage.scala | 2 +-
.../spark/ui/jobs/JobProgressListener.scala | 4 +
.../org/apache/spark/ui/jobs/PoolPage.scala | 2 +-
.../org/apache/spark/ui/jobs/StagePage.scala | 19 +-
.../org/apache/spark/ui/storage/RDDPage.scala | 73 +-
.../apache/spark/ui/storage/StoragePage.scala | 2 +-
.../apache/spark/ui/storage/StorageTab.scala | 6 +-
.../applications/json_expectation | 53 +
.../executors/json_expectation | 17 +
.../local-1422981780767/jobs/0/json_expectation | 15 +
.../local-1422981780767/jobs/json_expectation | 43 +
.../json_expectation | 43 +
.../jobs?status=succeeded/json_expectation | 29 +
.../local-1422981780767/json_expectation | 10 +
.../stages/1/0/json_expectation | 270 +
.../stages/1/json_expectation | 270 +
.../local-1422981780767/stages/json_expectation | 89 +
.../stages?status=complete/json_expectation | 67 +
.../stages?status=failed/json_expectation | 23 +
.../storage/rdd/0/json_expectation | 64 +
.../storage/rdd/json_expectation | 9 +
.../local-1426533911241/1/jobs/json_expectation | 15 +
.../1/stages/0/0/json_expectation | 242 +
.../1/stages/0/0/taskList/json_expectation | 193 +
.../1/stages/json_expectation | 27 +
.../local-1426533911241/2/jobs/json_expectation | 15 +
.../2/stages/0/0/taskList/json_expectation | 193 +
.../local-1426533911241/json_expectation | 17 +
.../stages/20/0/taskList/json_expectation | 481 +
.../json_expectation | 1201 ++
.../0/taskList?sortBy=-runtime/json_expectation | 481 +
.../json_expectation | 481 +
.../0/taskList?sortBy=runtime/json_expectation | 481 +
.../stages/20/0/taskSummary/json_expectation | 15 +
.../json_expectation | 15 +
.../json_expectation | 10 +
.../json_expectation | 19 +
.../json_expectation | 35 +
.../json_expectation | 53 +
.../json_expectation | 1 +
.../local-1422981759269/APPLICATION_COMPLETE | 0
.../local-1422981759269/EVENT_LOG_1 | 88 +
.../local-1422981759269/SPARK_VERSION_1.2.0 | 0
.../local-1422981780767/APPLICATION_COMPLETE | 0
.../local-1422981780767/EVENT_LOG_1 | 82 +
.../local-1422981780767/SPARK_VERSION_1.2.0 | 0
.../local-1425081759269/APPLICATION_COMPLETE | 0
.../local-1425081759269/EVENT_LOG_1 | 88 +
.../local-1425081759269/SPARK_VERSION_1.2.0 | 0
.../local-1426533911241/APPLICATION_COMPLETE | 0
.../local-1426533911241/EVENT_LOG_1 | 24 +
.../local-1426533911241/SPARK_VERSION_1.2.0 | 0
.../local-1426633911242/APPLICATION_COMPLETE | 0
.../local-1426633911242/EVENT_LOG_1 | 24 +
.../local-1426633911242/SPARK_VERSION_1.2.0 | 0
.../resources/spark-events/local-1427397477963 | 12083 +++++++++++++++++
.../scala/org/apache/spark/JsonTestUtils.scala | 34 +
.../apache/spark/deploy/JsonProtocolSuite.scala | 14 +-
.../deploy/history/HistoryServerSuite.scala | 223 +-
.../status/api/v1/SimpleDateParamTest.scala | 29 +
.../org/apache/spark/ui/UISeleniumSuite.scala | 264 +-
docs/monitoring.md | 74 +
pom.xml | 12 +
100 files changed, 19946 insertions(+), 172 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/.rat-excludes
----------------------------------------------------------------------
diff --git a/.rat-excludes b/.rat-excludes
index dccf2db..ac652ed 100644
--- a/.rat-excludes
+++ b/.rat-excludes
@@ -74,5 +74,12 @@ logs
.*scalastyle-output.xml
.*dependency-reduced-pom.xml
known_translations
+json_expectation
+local-1422981759269/*
+local-1422981780767/*
+local-1425081759269/*
+local-1426533911241/*
+local-1426633911242/*
+local-1427397477963/*
DESCRIPTION
NAMESPACE
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 164a836..fc42f48 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -229,6 +229,14 @@
<version>3.2.10</version>
</dependency>
<dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.mesos</groupId>
<artifactId>mesos</artifactId>
<classifier>${mesos.classifier}</classifier>
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/java/org/apache/spark/JobExecutionStatus.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/JobExecutionStatus.java b/core/src/main/java/org/apache/spark/JobExecutionStatus.java
index 6e16131..0287fb7 100644
--- a/core/src/main/java/org/apache/spark/JobExecutionStatus.java
+++ b/core/src/main/java/org/apache/spark/JobExecutionStatus.java
@@ -17,9 +17,15 @@
package org.apache.spark;
+import org.apache.spark.util.EnumUtil;
+
public enum JobExecutionStatus {
RUNNING,
SUCCEEDED,
FAILED,
- UNKNOWN
+ UNKNOWN;
+
+ public static JobExecutionStatus fromString(String str) {
+ return EnumUtil.parseIgnoreCase(JobExecutionStatus.class, str);
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/java/org/apache/spark/status/api/v1/ApplicationStatus.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/status/api/v1/ApplicationStatus.java b/core/src/main/java/org/apache/spark/status/api/v1/ApplicationStatus.java
new file mode 100644
index 0000000..8c7dcf7
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/status/api/v1/ApplicationStatus.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.api.v1;
+
+import org.apache.spark.util.EnumUtil;
+
+public enum ApplicationStatus {
+ COMPLETED,
+ RUNNING;
+
+ public static ApplicationStatus fromString(String str) {
+ return EnumUtil.parseIgnoreCase(ApplicationStatus.class, str);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/java/org/apache/spark/status/api/v1/StageStatus.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/status/api/v1/StageStatus.java b/core/src/main/java/org/apache/spark/status/api/v1/StageStatus.java
new file mode 100644
index 0000000..9dbb565
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/status/api/v1/StageStatus.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.api.v1;
+
+import org.apache.spark.util.EnumUtil;
+
+public enum StageStatus {
+ ACTIVE,
+ COMPLETE,
+ FAILED,
+ PENDING;
+
+ public static StageStatus fromString(String str) {
+ return EnumUtil.parseIgnoreCase(StageStatus.class, str);
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/java/org/apache/spark/status/api/v1/TaskSorting.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/status/api/v1/TaskSorting.java b/core/src/main/java/org/apache/spark/status/api/v1/TaskSorting.java
new file mode 100644
index 0000000..f19ed01
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/status/api/v1/TaskSorting.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.api.v1;
+
+import org.apache.spark.util.EnumUtil;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public enum TaskSorting {
+ ID,
+ INCREASING_RUNTIME("runtime"),
+ DECREASING_RUNTIME("-runtime");
+
+ private final Set<String> alternateNames;
+ private TaskSorting(String... names) {
+ alternateNames = new HashSet<String>();
+ for (String n: names) {
+ alternateNames.add(n);
+ }
+ }
+
+ public static TaskSorting fromString(String str) {
+ String lower = str.toLowerCase();
+ for (TaskSorting t: values()) {
+ if (t.alternateNames.contains(lower)) {
+ return t;
+ }
+ }
+ return EnumUtil.parseIgnoreCase(TaskSorting.class, str);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/java/org/apache/spark/util/EnumUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/EnumUtil.java b/core/src/main/java/org/apache/spark/util/EnumUtil.java
new file mode 100644
index 0000000..c40c7e7
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/util/EnumUtil.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util;
+
+import com.google.common.base.Joiner;
+import org.apache.spark.annotation.Private;
+
+@Private
+public class EnumUtil {
+ public static <E extends Enum<E>> E parseIgnoreCase(Class<E> clz, String str) {
+ E[] constants = clz.getEnumConstants();
+ if (str == null) {
+ return null;
+ }
+ for (E e : constants) {
+ if (e.name().equalsIgnoreCase(str)) {
+ return e;
+ }
+ }
+ throw new IllegalArgumentException(
+ String.format("Illegal type='%s'. Supported type values: %s",
+ str, Joiner.on(", ").join(constants)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index b98a54b..7ebee99 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -428,7 +428,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_ui =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener,
- _env.securityManager,appName))
+ _env.securityManager,appName, startTime = startTime))
} else {
// For tests, do not enable the UI
None
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/annotation/Private.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/annotation/Private.java b/core/src/main/scala/org/apache/spark/annotation/Private.java
new file mode 100644
index 0000000..9082fcf
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/annotation/Private.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * A class that is considered private to the internals of Spark -- there is a high-likelihood
+ * they will be changed in future versions of Spark.
+ *
+ * This should be used only when the standard Scala / Java means of protecting classes are
+ * insufficient. In particular, Java has no equivalent of private[spark], so we use this annotation
+ * in its place.
+ *
+ * NOTE: If there exists a Scaladoc comment that immediately precedes this annotation, the first
+ * line of the comment must be ":: Private ::" with no trailing blank line. This is because
+ * of the known issue that Scaladoc displays only either the annotation or the comment, whichever
+ * comes first.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE, ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER,
+ ElementType.CONSTRUCTOR, ElementType.LOCAL_VARIABLE, ElementType.PACKAGE})
+public @interface Private {}
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
index 6a5011a..298a820 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.history
import org.apache.spark.ui.SparkUI
-private[history] case class ApplicationAttemptInfo(
+private[spark] case class ApplicationAttemptInfo(
attemptId: Option[String],
startTime: Long,
endTime: Long,
@@ -27,7 +27,7 @@ private[history] case class ApplicationAttemptInfo(
sparkUser: String,
completed: Boolean = false)
-private[history] case class ApplicationHistoryInfo(
+private[spark] case class ApplicationHistoryInfo(
id: String,
name: String,
attempts: List[ApplicationAttemptInfo])
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 993763f..45c2be3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -17,23 +17,21 @@
package org.apache.spark.deploy.history
-import java.io.{IOException, BufferedInputStream, FileNotFoundException, InputStream}
+import java.io.{BufferedInputStream, FileNotFoundException, IOException, InputStream}
import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
import scala.collection.mutable
-import scala.concurrent.duration.Duration
-import com.google.common.util.concurrent.ThreadFactoryBuilder
-
-import com.google.common.util.concurrent.MoreExecutors
-import org.apache.hadoop.fs.permission.AccessControlException
+import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.fs.permission.AccessControlException
+
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
import org.apache.spark.scheduler._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
/**
* A class that provides application history from event logs stored in the file system.
@@ -151,7 +149,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val conf = this.conf.clone()
val appSecManager = new SecurityManager(conf)
SparkUI.createHistoryUI(conf, replayBus, appSecManager, appId,
- HistoryServer.getAttemptURI(appId, attempt.attemptId))
+ HistoryServer.getAttemptURI(appId, attempt.attemptId), attempt.startTime)
// Do not call ui.bind() to avoid creating a new server for each application
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 754c8e9..50522e6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -25,6 +25,7 @@ import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.status.api.v1.{ApplicationInfo, ApplicationsListResource, JsonRootResource, UIRoot}
import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.{SignalLogger, Utils}
@@ -45,7 +46,7 @@ class HistoryServer(
provider: ApplicationHistoryProvider,
securityManager: SecurityManager,
port: Int)
- extends WebUI(securityManager, port, conf) with Logging {
+ extends WebUI(securityManager, port, conf) with Logging with UIRoot {
// How many applications to retain
private val retainedApplications = conf.getInt("spark.history.retainedApplications", 50)
@@ -56,7 +57,7 @@ class HistoryServer(
require(parts.length == 1 || parts.length == 2, s"Invalid app key $key")
val ui = provider
.getAppUI(parts(0), if (parts.length > 1) Some(parts(1)) else None)
- .getOrElse(throw new NoSuchElementException())
+ .getOrElse(throw new NoSuchElementException(s"no app with key $key"))
attachSparkUI(ui)
ui
}
@@ -113,6 +114,10 @@ class HistoryServer(
}
}
+ def getSparkUI(appKey: String): Option[SparkUI] = {
+ Option(appCache.get(appKey))
+ }
+
initialize()
/**
@@ -123,6 +128,9 @@ class HistoryServer(
*/
def initialize() {
attachPage(new HistoryPage(this))
+
+ attachHandler(JsonRootResource.getJsonServlet(this))
+
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
val contextHandler = new ServletContextHandler
@@ -160,7 +168,13 @@ class HistoryServer(
*
* @return List of all known applications.
*/
- def getApplicationList(): Iterable[ApplicationHistoryInfo] = provider.getListing()
+ def getApplicationList(): Iterable[ApplicationHistoryInfo] = {
+ provider.getListing()
+ }
+
+ def getApplicationInfoList: Iterator[ApplicationInfo] = {
+ getApplicationList().iterator.map(ApplicationsListResource.appHistoryInfoToPublicAppInfo)
+ }
/**
* Returns the provider configuration to show in the listing page.
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index f59d550..1620e95 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -28,7 +28,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.deploy.ApplicationDescription
import org.apache.spark.util.Utils
-private[deploy] class ApplicationInfo(
+private[spark] class ApplicationInfo(
val startTime: Long,
val id: String,
val desc: ApplicationDescription,
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 0fac3cd..53e1903 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -754,9 +754,9 @@ private[master] class Master(
/**
* Rebuild a new SparkUI from the given application's event logs.
- * Return whether this is successful.
+ * Return the UI if successful, else None
*/
- private def rebuildSparkUI(app: ApplicationInfo): Boolean = {
+ private[master] def rebuildSparkUI(app: ApplicationInfo): Option[SparkUI] = {
val appName = app.desc.name
val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found"
try {
@@ -764,7 +764,7 @@ private[master] class Master(
.getOrElse {
// Event logging is not enabled for this application
app.desc.appUiUrl = notFoundBasePath
- return false
+ return None
}
val eventLogFilePrefix = EventLoggingListener.getLogPath(
@@ -787,7 +787,7 @@ private[master] class Master(
val logInput = EventLoggingListener.openEventLog(new Path(eventLogFile), fs)
val replayBus = new ReplayListenerBus()
val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf),
- appName + status, HistoryServer.UI_PATH_PREFIX + s"/${app.id}")
+ appName + status, HistoryServer.UI_PATH_PREFIX + s"/${app.id}", app.startTime)
val maybeTruncated = eventLogFile.endsWith(EventLoggingListener.IN_PROGRESS)
try {
replayBus.replay(logInput, eventLogFile, maybeTruncated)
@@ -798,7 +798,7 @@ private[master] class Master(
webUi.attachSparkUI(ui)
// Application UI is successfully rebuilt, so link the Master UI to it
app.desc.appUiUrl = ui.basePath
- true
+ Some(ui)
} catch {
case fnf: FileNotFoundException =>
// Event logging is enabled for this application, but no event logs are found
@@ -808,7 +808,7 @@ private[master] class Master(
msg += " Did you specify the correct logging directory?"
msg = URLEncoder.encode(msg, "UTF-8")
app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&title=$title"
- false
+ None
case e: Exception =>
// Relay exception message to application UI page
val title = s"Application history load error (${app.id})"
@@ -817,7 +817,7 @@ private[master] class Master(
logError(msg, e)
msg = URLEncoder.encode(msg, "UTF-8")
app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&exception=$exception&title=$title"
- false
+ None
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
index 273f077..06e265f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
@@ -23,10 +23,8 @@ import scala.concurrent.Await
import scala.xml.Node
import akka.pattern.ask
-import org.json4s.JValue
-import org.json4s.JsonAST.JNothing
-import org.apache.spark.deploy.{ExecutorState, JsonProtocol}
+import org.apache.spark.deploy.ExecutorState
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.ExecutorDesc
import org.apache.spark.ui.{UIUtils, WebUIPage}
@@ -38,21 +36,6 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
private val timeout = parent.timeout
/** Executor details for a particular application */
- override def renderJson(request: HttpServletRequest): JValue = {
- val appId = request.getParameter("appId")
- val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
- val state = Await.result(stateFuture, timeout)
- val app = state.activeApps.find(_.id == appId).getOrElse({
- state.completedApps.find(_.id == appId).getOrElse(null)
- })
- if (app == null) {
- JNothing
- } else {
- JsonProtocol.writeApplicationInfo(app)
- }
- }
-
- /** Executor details for a particular application */
def render(request: HttpServletRequest): Seq[Node] = {
val appId = request.getParameter("appId")
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
index 1f2c3fd..7569276 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
@@ -35,10 +35,13 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
private val master = parent.masterActorRef
private val timeout = parent.timeout
- override def renderJson(request: HttpServletRequest): JValue = {
+ def getMasterState: MasterStateResponse = {
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
- val state = Await.result(stateFuture, timeout)
- JsonProtocol.writeMasterState(state)
+ Await.result(stateFuture, timeout)
+ }
+
+ override def renderJson(request: HttpServletRequest): JValue = {
+ JsonProtocol.writeMasterState(getMasterState)
}
def handleAppKillRequest(request: HttpServletRequest): Unit = {
@@ -68,8 +71,7 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
/** Index view listing applications and executors */
def render(request: HttpServletRequest): Seq[Node] = {
- val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
- val state = Await.result(stateFuture, timeout)
+ val state = getMasterState
val workerHeaders = Seq("Worker Id", "Address", "State", "Cores", "Memory")
val workers = state.workers.sortBy(_.id)
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index dea0a65..eb26e9f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.master.ui
import org.apache.spark.Logging
import org.apache.spark.deploy.master.Master
+import org.apache.spark.status.api.v1.{ApplicationsListResource, ApplicationInfo, JsonRootResource, UIRoot}
import org.apache.spark.ui.{SparkUI, WebUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.RpcUtils
@@ -28,12 +29,15 @@ import org.apache.spark.util.RpcUtils
*/
private[master]
class MasterWebUI(val master: Master, requestedPort: Int)
- extends WebUI(master.securityMgr, requestedPort, master.conf, name = "MasterUI") with Logging {
+ extends WebUI(master.securityMgr, requestedPort, master.conf, name = "MasterUI") with Logging
+ with UIRoot {
val masterActorRef = master.self
val timeout = RpcUtils.askTimeout(master.conf)
val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true)
+ val masterPage = new MasterPage(this)
+
initialize()
/** Initialize all components of the server. */
@@ -43,6 +47,7 @@ class MasterWebUI(val master: Master, requestedPort: Int)
attachPage(new HistoryNotFoundPage(this))
attachPage(masterPage)
attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static"))
+ attachHandler(JsonRootResource.getJsonServlet(this))
attachHandler(createRedirectHandler(
"/app/kill", "/", masterPage.handleAppKillRequest, httpMethods = Set("POST")))
attachHandler(createRedirectHandler(
@@ -60,6 +65,23 @@ class MasterWebUI(val master: Master, requestedPort: Int)
assert(serverInfo.isDefined, "Master UI must be bound to a server before detaching SparkUIs")
ui.getHandlers.foreach(detachHandler)
}
+
+ def getApplicationInfoList: Iterator[ApplicationInfo] = {
+ val state = masterPage.getMasterState
+ val activeApps = state.activeApps.sortBy(_.startTime).reverse
+ val completedApps = state.completedApps.sortBy(_.endTime).reverse
+ activeApps.iterator.map { ApplicationsListResource.convertApplicationInfo(_, false) } ++
+ completedApps.iterator.map { ApplicationsListResource.convertApplicationInfo(_, true) }
+ }
+
+ def getSparkUI(appId: String): Option[SparkUI] = {
+ val state = masterPage.getMasterState
+ val activeApps = state.activeApps.sortBy(_.startTime).reverse
+ val completedApps = state.completedApps.sortBy(_.endTime).reverse
+ (activeApps ++ completedApps).find { _.id == appId }.flatMap {
+ master.rebuildSparkUI
+ }
+ }
}
private[master] object MasterWebUI {
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala
new file mode 100644
index 0000000..5783df5
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status.api.v1
+
+import java.util.{Arrays, Date, List => JList}
+import javax.ws.rs._
+import javax.ws.rs.core.MediaType
+
+import org.apache.spark.JobExecutionStatus
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.ui.jobs.JobProgressListener
+import org.apache.spark.ui.jobs.UIData.JobUIData
+
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class AllJobsResource(ui: SparkUI) {
+
+ @GET
+ def jobsList(@QueryParam("status") statuses: JList[JobExecutionStatus]): Seq[JobData] = {
+ val statusToJobs: Seq[(JobExecutionStatus, Seq[JobUIData])] =
+ AllJobsResource.getStatusToJobs(ui)
+ val adjStatuses: JList[JobExecutionStatus] = {
+ if (statuses.isEmpty) {
+ Arrays.asList(JobExecutionStatus.values(): _*)
+ } else {
+ statuses
+ }
+ }
+ val jobInfos = for {
+ (status, jobs) <- statusToJobs
+ job <- jobs if adjStatuses.contains(status)
+ } yield {
+ AllJobsResource.convertJobData(job, ui.jobProgressListener, false)
+ }
+ jobInfos.sortBy{- _.jobId}
+ }
+
+}
+
+private[v1] object AllJobsResource {
+
+ def getStatusToJobs(ui: SparkUI): Seq[(JobExecutionStatus, Seq[JobUIData])] = {
+ val statusToJobs = ui.jobProgressListener.synchronized {
+ Seq(
+ JobExecutionStatus.RUNNING -> ui.jobProgressListener.activeJobs.values.toSeq,
+ JobExecutionStatus.SUCCEEDED -> ui.jobProgressListener.completedJobs.toSeq,
+ JobExecutionStatus.FAILED -> ui.jobProgressListener.failedJobs.reverse.toSeq
+ )
+ }
+ statusToJobs
+ }
+
+ def convertJobData(
+ job: JobUIData,
+ listener: JobProgressListener,
+ includeStageDetails: Boolean): JobData = {
+ listener.synchronized {
+ val lastStageInfo = listener.stageIdToInfo.get(job.stageIds.max)
+ val lastStageData = lastStageInfo.flatMap { s =>
+ listener.stageIdToData.get((s.stageId, s.attemptId))
+ }
+ val lastStageName = lastStageInfo.map { _.name }.getOrElse("(Unknown Stage Name)")
+ val lastStageDescription = lastStageData.flatMap { _.description }
+ new JobData(
+ jobId = job.jobId,
+ name = lastStageName,
+ description = lastStageDescription,
+ submissionTime = job.submissionTime.map{new Date(_)},
+ completionTime = job.completionTime.map{new Date(_)},
+ stageIds = job.stageIds,
+ jobGroup = job.jobGroup,
+ status = job.status,
+ numTasks = job.numTasks,
+ numActiveTasks = job.numActiveTasks,
+ numCompletedTasks = job.numCompletedTasks,
+ numSkippedTasks = job.numCompletedTasks,
+ numFailedTasks = job.numFailedTasks,
+ numActiveStages = job.numActiveStages,
+ numCompletedStages = job.completedStageIndices.size,
+ numSkippedStages = job.numSkippedStages,
+ numFailedStages = job.numFailedStages
+ )
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala
new file mode 100644
index 0000000..645ede2
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status.api.v1
+
+import javax.ws.rs.{GET, Produces}
+import javax.ws.rs.core.MediaType
+
+import org.apache.spark.storage.{RDDInfo, StorageStatus, StorageUtils}
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.ui.storage.StorageListener
+
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class AllRDDResource(ui: SparkUI) {
+
+ @GET
+ def rddList(): Seq[RDDStorageInfo] = {
+ val storageStatusList = ui.storageListener.storageStatusList
+ val rddInfos = ui.storageListener.rddInfoList
+ rddInfos.map{rddInfo =>
+ AllRDDResource.getRDDStorageInfo(rddInfo.id, rddInfo, storageStatusList,
+ includeDetails = false)
+ }
+ }
+
+}
+
+private[spark] object AllRDDResource {
+
+ def getRDDStorageInfo(
+ rddId: Int,
+ listener: StorageListener,
+ includeDetails: Boolean): Option[RDDStorageInfo] = {
+ val storageStatusList = listener.storageStatusList
+ listener.rddInfoList.find { _.id == rddId }.map { rddInfo =>
+ getRDDStorageInfo(rddId, rddInfo, storageStatusList, includeDetails)
+ }
+ }
+
+ def getRDDStorageInfo(
+ rddId: Int,
+ rddInfo: RDDInfo,
+ storageStatusList: Seq[StorageStatus],
+ includeDetails: Boolean): RDDStorageInfo = {
+ val workers = storageStatusList.map { (rddId, _) }
+ val blockLocations = StorageUtils.getRddBlockLocations(rddId, storageStatusList)
+ val blocks = storageStatusList
+ .flatMap { _.rddBlocksById(rddId) }
+ .sortWith { _._1.name < _._1.name }
+ .map { case (blockId, status) =>
+ (blockId, status, blockLocations.get(blockId).getOrElse(Seq[String]("Unknown")))
+ }
+
+ val dataDistribution = if (includeDetails) {
+ Some(storageStatusList.map { status =>
+ new RDDDataDistribution(
+ address = status.blockManagerId.hostPort,
+ memoryUsed = status.memUsedByRdd(rddId),
+ memoryRemaining = status.memRemaining,
+ diskUsed = status.diskUsedByRdd(rddId)
+ ) } )
+ } else {
+ None
+ }
+ val partitions = if (includeDetails) {
+ Some(blocks.map { case (id, block, locations) =>
+ new RDDPartitionInfo(
+ blockName = id.name,
+ storageLevel = block.storageLevel.description,
+ memoryUsed = block.memSize,
+ diskUsed = block.diskSize,
+ executors = locations
+ )
+ } )
+ } else {
+ None
+ }
+
+ new RDDStorageInfo(
+ id = rddId,
+ name = rddInfo.name,
+ numPartitions = rddInfo.numPartitions,
+ numCachedPartitions = rddInfo.numCachedPartitions,
+ storageLevel = rddInfo.storageLevel.description,
+ memoryUsed = rddInfo.memSize,
+ diskUsed = rddInfo.diskSize,
+ dataDistribution = dataDistribution,
+ partitions = partitions
+ )
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
new file mode 100644
index 0000000..5060858
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status.api.v1
+
+import java.util.{Arrays, Date, List => JList}
+import javax.ws.rs.{GET, PathParam, Produces, QueryParam}
+import javax.ws.rs.core.MediaType
+
+import org.apache.spark.executor.{InputMetrics => InternalInputMetrics, OutputMetrics => InternalOutputMetrics, ShuffleReadMetrics => InternalShuffleReadMetrics, ShuffleWriteMetrics => InternalShuffleWriteMetrics, TaskMetrics => InternalTaskMetrics}
+import org.apache.spark.scheduler.{AccumulableInfo => InternalAccumulableInfo, StageInfo}
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.ui.jobs.UIData.{StageUIData, TaskUIData}
+import org.apache.spark.util.Distribution
+
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class AllStagesResource(ui: SparkUI) {
+
+ @GET
+ def stageList(@QueryParam("status") statuses: JList[StageStatus]): Seq[StageData] = {
+ val listener = ui.jobProgressListener
+ val stageAndStatus = AllStagesResource.stagesAndStatus(ui)
+ val adjStatuses = {
+ if (statuses.isEmpty()) {
+ Arrays.asList(StageStatus.values(): _*)
+ } else {
+ statuses
+ }
+ }
+ for {
+ (status, stageList) <- stageAndStatus
+ stageInfo: StageInfo <- stageList if adjStatuses.contains(status)
+ stageUiData: StageUIData <- listener.synchronized {
+ listener.stageIdToData.get((stageInfo.stageId, stageInfo.attemptId))
+ }
+ } yield {
+ AllStagesResource.stageUiToStageData(status, stageInfo, stageUiData, includeDetails = false)
+ }
+ }
+}
+
+private[v1] object AllStagesResource {
+ def stageUiToStageData(
+ status: StageStatus,
+ stageInfo: StageInfo,
+ stageUiData: StageUIData,
+ includeDetails: Boolean): StageData = {
+
+ val taskData = if (includeDetails) {
+ Some(stageUiData.taskData.map { case (k, v) => k -> convertTaskData(v) } )
+ } else {
+ None
+ }
+ val executorSummary = if (includeDetails) {
+ Some(stageUiData.executorSummary.map { case (k, summary) =>
+ k -> new ExecutorStageSummary(
+ taskTime = summary.taskTime,
+ failedTasks = summary.failedTasks,
+ succeededTasks = summary.succeededTasks,
+ inputBytes = summary.inputBytes,
+ outputBytes = summary.outputBytes,
+ shuffleRead = summary.shuffleRead,
+ shuffleWrite = summary.shuffleWrite,
+ memoryBytesSpilled = summary.memoryBytesSpilled,
+ diskBytesSpilled = summary.diskBytesSpilled
+ )
+ })
+ } else {
+ None
+ }
+
+ val accumulableInfo = stageUiData.accumulables.values.map { convertAccumulableInfo }.toSeq
+
+ new StageData(
+ status = status,
+ stageId = stageInfo.stageId,
+ attemptId = stageInfo.attemptId,
+ numActiveTasks = stageUiData.numActiveTasks,
+ numCompleteTasks = stageUiData.numCompleteTasks,
+ numFailedTasks = stageUiData.numFailedTasks,
+ executorRunTime = stageUiData.executorRunTime,
+ inputBytes = stageUiData.inputBytes,
+ inputRecords = stageUiData.inputRecords,
+ outputBytes = stageUiData.outputBytes,
+ outputRecords = stageUiData.outputRecords,
+ shuffleReadBytes = stageUiData.shuffleReadTotalBytes,
+ shuffleReadRecords = stageUiData.shuffleReadRecords,
+ shuffleWriteBytes = stageUiData.shuffleWriteBytes,
+ shuffleWriteRecords = stageUiData.shuffleWriteRecords,
+ memoryBytesSpilled = stageUiData.memoryBytesSpilled,
+ diskBytesSpilled = stageUiData.diskBytesSpilled,
+ schedulingPool = stageUiData.schedulingPool,
+ name = stageInfo.name,
+ details = stageInfo.details,
+ accumulatorUpdates = accumulableInfo,
+ tasks = taskData,
+ executorSummary = executorSummary
+ )
+ }
+
+ def stagesAndStatus(ui: SparkUI): Seq[(StageStatus, Seq[StageInfo])] = {
+ val listener = ui.jobProgressListener
+ listener.synchronized {
+ Seq(
+ StageStatus.ACTIVE -> listener.activeStages.values.toSeq,
+ StageStatus.COMPLETE -> listener.completedStages.reverse.toSeq,
+ StageStatus.FAILED -> listener.failedStages.reverse.toSeq,
+ StageStatus.PENDING -> listener.pendingStages.values.toSeq
+ )
+ }
+ }
+
+ def convertTaskData(uiData: TaskUIData): TaskData = {
+ new TaskData(
+ taskId = uiData.taskInfo.taskId,
+ index = uiData.taskInfo.index,
+ attempt = uiData.taskInfo.attempt,
+ launchTime = new Date(uiData.taskInfo.launchTime),
+ executorId = uiData.taskInfo.executorId,
+ host = uiData.taskInfo.host,
+ taskLocality = uiData.taskInfo.taskLocality.toString(),
+ speculative = uiData.taskInfo.speculative,
+ accumulatorUpdates = uiData.taskInfo.accumulables.map { convertAccumulableInfo },
+ errorMessage = uiData.errorMessage,
+ taskMetrics = uiData.taskMetrics.map { convertUiTaskMetrics }
+ )
+ }
+
+ def taskMetricDistributions(
+ allTaskData: Iterable[TaskUIData],
+ quantiles: Array[Double]): TaskMetricDistributions = {
+
+ val rawMetrics = allTaskData.flatMap{_.taskMetrics}.toSeq
+
+ def metricQuantiles(f: InternalTaskMetrics => Double): IndexedSeq[Double] =
+ Distribution(rawMetrics.map { d => f(d) }).get.getQuantiles(quantiles)
+
+ // We need to do a lot of similar munging to nested metrics here. For each one,
+ // we want (a) extract the values for nested metrics (b) make a distribution for each metric
+ // (c) shove the distribution into the right field in our return type and (d) only return
+ // a result if the option is defined for any of the tasks. MetricHelper is a little util
+ // to make it a little easier to deal w/ all of the nested options. Mostly it lets us just
+ // implement one "build" method, which just builds the quantiles for each field.
+
+ val inputMetrics: Option[InputMetricDistributions] =
+ new MetricHelper[InternalInputMetrics, InputMetricDistributions](rawMetrics, quantiles) {
+ def getSubmetrics(raw: InternalTaskMetrics): Option[InternalInputMetrics] = {
+ raw.inputMetrics
+ }
+
+ def build: InputMetricDistributions = new InputMetricDistributions(
+ bytesRead = submetricQuantiles(_.bytesRead),
+ recordsRead = submetricQuantiles(_.recordsRead)
+ )
+ }.metricOption
+
+ val outputMetrics: Option[OutputMetricDistributions] =
+ new MetricHelper[InternalOutputMetrics, OutputMetricDistributions](rawMetrics, quantiles) {
+ def getSubmetrics(raw:InternalTaskMetrics): Option[InternalOutputMetrics] = {
+ raw.outputMetrics
+ }
+ def build: OutputMetricDistributions = new OutputMetricDistributions(
+ bytesWritten = submetricQuantiles(_.bytesWritten),
+ recordsWritten = submetricQuantiles(_.recordsWritten)
+ )
+ }.metricOption
+
+ val shuffleReadMetrics: Option[ShuffleReadMetricDistributions] =
+ new MetricHelper[InternalShuffleReadMetrics, ShuffleReadMetricDistributions](rawMetrics,
+ quantiles) {
+ def getSubmetrics(raw: InternalTaskMetrics): Option[InternalShuffleReadMetrics] = {
+ raw.shuffleReadMetrics
+ }
+ def build: ShuffleReadMetricDistributions = new ShuffleReadMetricDistributions(
+ readBytes = submetricQuantiles(_.totalBytesRead),
+ readRecords = submetricQuantiles(_.recordsRead),
+ remoteBytesRead = submetricQuantiles(_.remoteBytesRead),
+ remoteBlocksFetched = submetricQuantiles(_.remoteBlocksFetched),
+ localBlocksFetched = submetricQuantiles(_.localBlocksFetched),
+ totalBlocksFetched = submetricQuantiles(_.totalBlocksFetched),
+ fetchWaitTime = submetricQuantiles(_.fetchWaitTime)
+ )
+ }.metricOption
+
+ val shuffleWriteMetrics: Option[ShuffleWriteMetricDistributions] =
+ new MetricHelper[InternalShuffleWriteMetrics, ShuffleWriteMetricDistributions](rawMetrics,
+ quantiles) {
+ def getSubmetrics(raw: InternalTaskMetrics): Option[InternalShuffleWriteMetrics] = {
+ raw.shuffleWriteMetrics
+ }
+ def build: ShuffleWriteMetricDistributions = new ShuffleWriteMetricDistributions(
+ writeBytes = submetricQuantiles(_.shuffleBytesWritten),
+ writeRecords = submetricQuantiles(_.shuffleRecordsWritten),
+ writeTime = submetricQuantiles(_.shuffleWriteTime)
+ )
+ }.metricOption
+
+ new TaskMetricDistributions(
+ quantiles = quantiles,
+ executorDeserializeTime = metricQuantiles(_.executorDeserializeTime),
+ executorRunTime = metricQuantiles(_.executorRunTime),
+ resultSize = metricQuantiles(_.resultSize),
+ jvmGcTime = metricQuantiles(_.jvmGCTime),
+ resultSerializationTime = metricQuantiles(_.resultSerializationTime),
+ memoryBytesSpilled = metricQuantiles(_.memoryBytesSpilled),
+ diskBytesSpilled = metricQuantiles(_.diskBytesSpilled),
+ inputMetrics = inputMetrics,
+ outputMetrics = outputMetrics,
+ shuffleReadMetrics = shuffleReadMetrics,
+ shuffleWriteMetrics = shuffleWriteMetrics
+ )
+ }
+
+ def convertAccumulableInfo(acc: InternalAccumulableInfo): AccumulableInfo = {
+ new AccumulableInfo(acc.id, acc.name, acc.update, acc.value)
+ }
+
+ def convertUiTaskMetrics(internal: InternalTaskMetrics): TaskMetrics = {
+ new TaskMetrics(
+ executorDeserializeTime = internal.executorDeserializeTime,
+ executorRunTime = internal.executorRunTime,
+ resultSize = internal.resultSize,
+ jvmGcTime = internal.jvmGCTime,
+ resultSerializationTime = internal.resultSerializationTime,
+ memoryBytesSpilled = internal.memoryBytesSpilled,
+ diskBytesSpilled = internal.diskBytesSpilled,
+ inputMetrics = internal.inputMetrics.map { convertInputMetrics },
+ outputMetrics = Option(internal.outputMetrics).flatten.map { convertOutputMetrics },
+ shuffleReadMetrics = internal.shuffleReadMetrics.map { convertShuffleReadMetrics },
+ shuffleWriteMetrics = internal.shuffleWriteMetrics.map { convertShuffleWriteMetrics }
+ )
+ }
+
+ def convertInputMetrics(internal: InternalInputMetrics): InputMetrics = {
+ new InputMetrics(
+ bytesRead = internal.bytesRead,
+ recordsRead = internal.recordsRead
+ )
+ }
+
+ def convertOutputMetrics(internal: InternalOutputMetrics): OutputMetrics = {
+ new OutputMetrics(
+ bytesWritten = internal.bytesWritten,
+ recordsWritten = internal.recordsWritten
+ )
+ }
+
+ def convertShuffleReadMetrics(internal: InternalShuffleReadMetrics): ShuffleReadMetrics = {
+ new ShuffleReadMetrics(
+ remoteBlocksFetched = internal.remoteBlocksFetched,
+ localBlocksFetched = internal.localBlocksFetched,
+ fetchWaitTime = internal.fetchWaitTime,
+ remoteBytesRead = internal.remoteBytesRead,
+ totalBlocksFetched = internal.totalBlocksFetched,
+ recordsRead = internal.recordsRead
+ )
+ }
+
+ def convertShuffleWriteMetrics(internal: InternalShuffleWriteMetrics): ShuffleWriteMetrics = {
+ new ShuffleWriteMetrics(
+ bytesWritten = internal.shuffleBytesWritten,
+ writeTime = internal.shuffleWriteTime,
+ recordsWritten = internal.shuffleRecordsWritten
+ )
+ }
+}
+
+/**
+ * Helper for getting distributions from nested metric types. Many of the metrics we want are
+ * contained in options inside TaskMetrics (eg., ShuffleWriteMetrics). This makes it easy to handle
+ * the options (returning None if the metrics are all empty), and extract the quantiles for each
+ * metric. After creating an instance, call metricOption to get the result type.
+ */
+private[v1] abstract class MetricHelper[I,O](
+ rawMetrics: Seq[InternalTaskMetrics],
+ quantiles: Array[Double]) {
+
+ def getSubmetrics(raw: InternalTaskMetrics): Option[I]
+
+ def build: O
+
+ val data: Seq[I] = rawMetrics.flatMap(getSubmetrics)
+
+ /** applies the given function to all input metrics, and returns the quantiles */
+ def submetricQuantiles(f: I => Double): IndexedSeq[Double] = {
+ Distribution(data.map { d => f(d) }).get.getQuantiles(quantiles)
+ }
+
+ def metricOption: Option[O] = {
+ if (data.isEmpty) {
+ None
+ } else {
+ Some(build)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala
new file mode 100644
index 0000000..17b521f
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status.api.v1
+
+import java.util.{Arrays, Date, List => JList}
+import javax.ws.rs.{DefaultValue, GET, Produces, QueryParam}
+import javax.ws.rs.core.MediaType
+
+import org.apache.spark.deploy.history.ApplicationHistoryInfo
+import org.apache.spark.deploy.master.{ApplicationInfo => InternalApplicationInfo}
+
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class ApplicationListResource(uiRoot: UIRoot) {
+
+ @GET
+ def appList(
+ @QueryParam("status") status: JList[ApplicationStatus],
+ @DefaultValue("2010-01-01") @QueryParam("minDate") minDate: SimpleDateParam,
+ @DefaultValue("3000-01-01") @QueryParam("maxDate") maxDate: SimpleDateParam)
+ : Iterator[ApplicationInfo] = {
+ val allApps = uiRoot.getApplicationInfoList
+ val adjStatus = {
+ if (status.isEmpty) {
+ Arrays.asList(ApplicationStatus.values(): _*)
+ } else {
+ status
+ }
+ }
+ val includeCompleted = adjStatus.contains(ApplicationStatus.COMPLETED)
+ val includeRunning = adjStatus.contains(ApplicationStatus.RUNNING)
+ allApps.filter { app =>
+ val anyRunning = app.attempts.exists(!_.completed)
+ // if any attempt is still running, we consider the app to also still be running
+ val statusOk = (!anyRunning && includeCompleted) ||
+ (anyRunning && includeRunning)
+ // keep the app if *any* attempts fall in the right time window
+ val dateOk = app.attempts.exists { attempt =>
+ attempt.startTime.getTime >= minDate.timestamp &&
+ attempt.startTime.getTime <= maxDate.timestamp
+ }
+ statusOk && dateOk
+ }
+ }
+}
+
+private[spark] object ApplicationsListResource {
+ def appHistoryInfoToPublicAppInfo(app: ApplicationHistoryInfo): ApplicationInfo = {
+ new ApplicationInfo(
+ id = app.id,
+ name = app.name,
+ attempts = app.attempts.map { internalAttemptInfo =>
+ new ApplicationAttemptInfo(
+ attemptId = internalAttemptInfo.attemptId,
+ startTime = new Date(internalAttemptInfo.startTime),
+ endTime = new Date(internalAttemptInfo.endTime),
+ sparkUser = internalAttemptInfo.sparkUser,
+ completed = internalAttemptInfo.completed
+ )
+ }
+ )
+ }
+
+ def convertApplicationInfo(
+ internal: InternalApplicationInfo,
+ completed: Boolean): ApplicationInfo = {
+ // standalone application info always has just one attempt
+ new ApplicationInfo(
+ id = internal.id,
+ name = internal.desc.name,
+ attempts = Seq(new ApplicationAttemptInfo(
+ attemptId = None,
+ startTime = new Date(internal.startTime),
+ endTime = new Date(internal.endTime),
+ sparkUser = internal.desc.user,
+ completed = completed
+ ))
+ )
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala
new file mode 100644
index 0000000..8ad4656
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala
@@ -0,0 +1,36 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.spark.status.api.v1
+
+import javax.ws.rs.{GET, PathParam, Produces}
+import javax.ws.rs.core.MediaType
+
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.ui.exec.ExecutorsPage
+
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class ExecutorListResource(ui: SparkUI) {
+
+ @GET
+ def executorList(): Seq[ExecutorSummary] = {
+ val listener = ui.executorsListener
+ val storageStatusList = listener.storageStatusList
+ (0 until storageStatusList.size).map { statusId =>
+ ExecutorsPage.getExecInfo(listener, statusId)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala b/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala
new file mode 100644
index 0000000..202a519
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status.api.v1
+
+import java.io.OutputStream
+import java.lang.annotation.Annotation
+import java.lang.reflect.Type
+import java.text.SimpleDateFormat
+import java.util.{Calendar, SimpleTimeZone}
+import javax.ws.rs.Produces
+import javax.ws.rs.core.{MediaType, MultivaluedMap}
+import javax.ws.rs.ext.{MessageBodyWriter, Provider}
+
+import com.fasterxml.jackson.annotation.JsonInclude
+import com.fasterxml.jackson.databind.{ObjectMapper, SerializationFeature}
+
+/**
+ * This class converts the POJO metric responses into json, using jackson.
+ *
+ * This doesn't follow the standard jersey-jackson plugin options, because we want to stick
+ * with an old version of jersey (since we have it from yarn anyway) and don't want to pull in lots
+ * of dependencies from a new plugin.
+ *
+ * Note that jersey automatically discovers this class based on its package and its annotations.
+ */
+@Provider
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class JacksonMessageWriter extends MessageBodyWriter[Object]{
+
+ val mapper = new ObjectMapper() {
+ override def writeValueAsString(t: Any): String = {
+ super.writeValueAsString(t)
+ }
+ }
+ mapper.registerModule(com.fasterxml.jackson.module.scala.DefaultScalaModule)
+ mapper.enable(SerializationFeature.INDENT_OUTPUT)
+ mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)
+ mapper.setDateFormat(JacksonMessageWriter.makeISODateFormat)
+
+ override def isWriteable(
+ aClass: Class[_],
+ `type`: Type,
+ annotations: Array[Annotation],
+ mediaType: MediaType): Boolean = {
+ true
+ }
+
+ override def writeTo(
+ t: Object,
+ aClass: Class[_],
+ `type`: Type,
+ annotations: Array[Annotation],
+ mediaType: MediaType,
+ multivaluedMap: MultivaluedMap[String, AnyRef],
+ outputStream: OutputStream): Unit = {
+ t match {
+ case ErrorWrapper(err) => outputStream.write(err.getBytes("utf-8"))
+ case _ => mapper.writeValue(outputStream, t)
+ }
+ }
+
+ override def getSize(
+ t: Object,
+ aClass: Class[_],
+ `type`: Type,
+ annotations: Array[Annotation],
+ mediaType: MediaType): Long = {
+ -1L
+ }
+}
+
+private[spark] object JacksonMessageWriter {
+ def makeISODateFormat: SimpleDateFormat = {
+ val iso8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'GMT'")
+ val cal = Calendar.getInstance(new SimpleTimeZone(0, "GMT"))
+ iso8601.setCalendar(cal)
+ iso8601
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/status/api/v1/JsonRootResource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/JsonRootResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/JsonRootResource.scala
new file mode 100644
index 0000000..c3ec45f
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/JsonRootResource.scala
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status.api.v1
+
+import javax.servlet.ServletContext
+import javax.ws.rs._
+import javax.ws.rs.core.{Context, Response}
+
+import com.sun.jersey.api.core.ResourceConfig
+import com.sun.jersey.spi.container.servlet.ServletContainer
+import org.eclipse.jetty.server.handler.ContextHandler
+import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
+
+import org.apache.spark.SecurityManager
+import org.apache.spark.ui.SparkUI
+
+/**
+ * Main entry point for serving spark application metrics as json, using JAX-RS.
+ *
+ * Each resource should have endpoints that return **public** classes defined in api.scala. Mima
+ * binary compatibility checks ensure that we don't inadvertently make changes that break the api.
+ * The returned objects are automatically converted to json by jackson with JacksonMessageWriter.
+ * In addition, there are a number of tests in HistoryServerSuite that compare the json to "golden
+ * files". Any changes and additions should be reflected there as well -- see the notes in
+ * HistoryServerSuite.
+ */
+@Path("/v1")
+private[v1] class JsonRootResource extends UIRootFromServletContext {
+
+ @Path("applications")
+ def getApplicationList(): ApplicationListResource = {
+ new ApplicationListResource(uiRoot)
+ }
+
+ @Path("applications/{appId}")
+ def getApplication(): OneApplicationResource = {
+ new OneApplicationResource(uiRoot)
+ }
+
+ @Path("applications/{appId}/{attemptId}/jobs")
+ def getJobs(
+ @PathParam("appId") appId: String,
+ @PathParam("attemptId") attemptId: String): AllJobsResource = {
+ uiRoot.withSparkUI(appId, Some(attemptId)) { ui =>
+ new AllJobsResource(ui)
+ }
+ }
+
+ @Path("applications/{appId}/jobs")
+ def getJobs(@PathParam("appId") appId: String): AllJobsResource = {
+ uiRoot.withSparkUI(appId, None) { ui =>
+ new AllJobsResource(ui)
+ }
+ }
+
+ @Path("applications/{appId}/jobs/{jobId: \\d+}")
+ def getJob(@PathParam("appId") appId: String): OneJobResource = {
+ uiRoot.withSparkUI(appId, None) { ui =>
+ new OneJobResource(ui)
+ }
+ }
+
+ @Path("applications/{appId}/{attemptId}/jobs/{jobId: \\d+}")
+ def getJob(
+ @PathParam("appId") appId: String,
+ @PathParam("attemptId") attemptId: String): OneJobResource = {
+ uiRoot.withSparkUI(appId, Some(attemptId)) { ui =>
+ new OneJobResource(ui)
+ }
+ }
+
+ @Path("applications/{appId}/executors")
+ def getExecutors(@PathParam("appId") appId: String): ExecutorListResource = {
+ uiRoot.withSparkUI(appId, None) { ui =>
+ new ExecutorListResource(ui)
+ }
+ }
+
+ @Path("applications/{appId}/{attemptId}/executors")
+ def getExecutors(
+ @PathParam("appId") appId: String,
+ @PathParam("attemptId") attemptId: String): ExecutorListResource = {
+ uiRoot.withSparkUI(appId, Some(attemptId)) { ui =>
+ new ExecutorListResource(ui)
+ }
+ }
+
+
+ @Path("applications/{appId}/stages")
+ def getStages(@PathParam("appId") appId: String): AllStagesResource= {
+ uiRoot.withSparkUI(appId, None) { ui =>
+ new AllStagesResource(ui)
+ }
+ }
+
+ @Path("applications/{appId}/{attemptId}/stages")
+ def getStages(
+ @PathParam("appId") appId: String,
+ @PathParam("attemptId") attemptId: String): AllStagesResource= {
+ uiRoot.withSparkUI(appId, Some(attemptId)) { ui =>
+ new AllStagesResource(ui)
+ }
+ }
+
+ @Path("applications/{appId}/stages/{stageId: \\d+}")
+ def getStage(@PathParam("appId") appId: String): OneStageResource= {
+ uiRoot.withSparkUI(appId, None) { ui =>
+ new OneStageResource(ui)
+ }
+ }
+
+ @Path("applications/{appId}/{attemptId}/stages/{stageId: \\d+}")
+ def getStage(
+ @PathParam("appId") appId: String,
+ @PathParam("attemptId") attemptId: String): OneStageResource = {
+ uiRoot.withSparkUI(appId, Some(attemptId)) { ui =>
+ new OneStageResource(ui)
+ }
+ }
+
+ @Path("applications/{appId}/storage/rdd")
+ def getRdds(@PathParam("appId") appId: String): AllRDDResource = {
+ uiRoot.withSparkUI(appId, None) { ui =>
+ new AllRDDResource(ui)
+ }
+ }
+
+ @Path("applications/{appId}/{attemptId}/storage/rdd")
+ def getRdds(
+ @PathParam("appId") appId: String,
+ @PathParam("attemptId") attemptId: String): AllRDDResource = {
+ uiRoot.withSparkUI(appId, Some(attemptId)) { ui =>
+ new AllRDDResource(ui)
+ }
+ }
+
+ @Path("applications/{appId}/storage/rdd/{rddId: \\d+}")
+ def getRdd(@PathParam("appId") appId: String): OneRDDResource = {
+ uiRoot.withSparkUI(appId, None) { ui =>
+ new OneRDDResource(ui)
+ }
+ }
+
+ @Path("applications/{appId}/{attemptId}/storage/rdd/{rddId: \\d+}")
+ def getRdd(
+ @PathParam("appId") appId: String,
+ @PathParam("attemptId") attemptId: String): OneRDDResource = {
+ uiRoot.withSparkUI(appId, Some(attemptId)) { ui =>
+ new OneRDDResource(ui)
+ }
+ }
+
+}
+
+private[spark] object JsonRootResource {
+
+ def getJsonServlet(uiRoot: UIRoot): ServletContextHandler = {
+ val jerseyContext = new ServletContextHandler(ServletContextHandler.NO_SESSIONS)
+ jerseyContext.setContextPath("/json")
+ val holder:ServletHolder = new ServletHolder(classOf[ServletContainer])
+ holder.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",
+ "com.sun.jersey.api.core.PackagesResourceConfig")
+ holder.setInitParameter("com.sun.jersey.config.property.packages",
+ "org.apache.spark.status.api.v1")
+ holder.setInitParameter(ResourceConfig.PROPERTY_CONTAINER_REQUEST_FILTERS,
+ classOf[SecurityFilter].getCanonicalName)
+ UIRootFromServletContext.setUiRoot(jerseyContext, uiRoot)
+ jerseyContext.addServlet(holder, "/*")
+ jerseyContext
+ }
+}
+
+/**
+ * This trait is shared by the all the root containers for application UI information --
+ * the HistoryServer, the Master UI, and the application UI. This provides the common
+ * interface needed for them all to expose application info as json.
+ */
+private[spark] trait UIRoot {
+ def getSparkUI(appKey: String): Option[SparkUI]
+ def getApplicationInfoList: Iterator[ApplicationInfo]
+
+ /**
+ * Get the spark UI with the given appID, and apply a function
+ * to it. If there is no such app, throw an appropriate exception
+ */
+ def withSparkUI[T](appId: String, attemptId: Option[String])(f: SparkUI => T): T = {
+ val appKey = attemptId.map(appId + "/" + _).getOrElse(appId)
+ getSparkUI(appKey) match {
+ case Some(ui) =>
+ f(ui)
+ case None => throw new NotFoundException("no such app: " + appId)
+ }
+ }
+ def securityManager: SecurityManager
+}
+
+private[v1] object UIRootFromServletContext {
+
+ private val attribute = getClass.getCanonicalName
+
+ def setUiRoot(contextHandler: ContextHandler, uiRoot: UIRoot): Unit = {
+ contextHandler.setAttribute(attribute, uiRoot)
+ }
+
+ def getUiRoot(context: ServletContext): UIRoot = {
+ context.getAttribute(attribute).asInstanceOf[UIRoot]
+ }
+}
+
+private[v1] trait UIRootFromServletContext {
+ @Context
+ var servletContext: ServletContext = _
+
+ def uiRoot: UIRoot = UIRootFromServletContext.getUiRoot(servletContext)
+}
+
+private[v1] class NotFoundException(msg: String) extends WebApplicationException(
+ new NoSuchElementException(msg),
+ Response
+ .status(Response.Status.NOT_FOUND)
+ .entity(ErrorWrapper(msg))
+ .build()
+)
+
+private[v1] class BadParameterException(msg: String) extends WebApplicationException(
+ new IllegalArgumentException(msg),
+ Response
+ .status(Response.Status.BAD_REQUEST)
+ .entity(ErrorWrapper(msg))
+ .build()
+) {
+ def this(param: String, exp: String, actual: String) = {
+ this(raw"""Bad value for parameter "$param". Expected a $exp, got "$actual"""")
+ }
+}
+
+/**
+ * Signal to JacksonMessageWriter to not convert the message into json (which would result in an
+ * extra set of quotes).
+ */
+private[v1] case class ErrorWrapper(s: String)
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala
new file mode 100644
index 0000000..b5ef726
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status.api.v1
+
+import javax.ws.rs.core.MediaType
+import javax.ws.rs.{Produces, PathParam, GET}
+
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class OneApplicationResource(uiRoot: UIRoot) {
+
+ @GET
+ def getApp(@PathParam("appId") appId: String): ApplicationInfo = {
+ val apps = uiRoot.getApplicationInfoList.find { _.id == appId }
+ apps.getOrElse(throw new NotFoundException("unknown app: " + appId))
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala
new file mode 100644
index 0000000..6d8a60d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status.api.v1
+
+import javax.ws.rs.{PathParam, GET, Produces}
+import javax.ws.rs.core.MediaType
+
+import org.apache.spark.JobExecutionStatus
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.ui.jobs.UIData.JobUIData
+
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class OneJobResource(ui: SparkUI) {
+
+ @GET
+ def oneJob(@PathParam("jobId") jobId: Int): JobData = {
+ val statusToJobs: Seq[(JobExecutionStatus, Seq[JobUIData])] =
+ AllJobsResource.getStatusToJobs(ui)
+ val jobOpt = statusToJobs.map {_._2} .flatten.find { jobInfo => jobInfo.jobId == jobId}
+ jobOpt.map { job =>
+ AllJobsResource.convertJobData(job, ui.jobProgressListener, false)
+ }.getOrElse {
+ throw new NotFoundException("unknown job: " + jobId)
+ }
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org