You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/05/20 12:22:57 UTC
[spark] branch branch-3.0 updated: Revert
[SPARK-27142][SPARK-31440] SQL rest API in branch 3.0
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 4e95339 Revert [SPARK-27142][SPARK-31440] SQL rest API in branch 3.0
4e95339 is described below
commit 4e953394725fc1adcd620f25c6db4096ce3dfe98
Author: Gengliang Wang <ge...@databricks.com>
AuthorDate: Wed May 20 12:20:45 2020 +0000
Revert [SPARK-27142][SPARK-31440] SQL rest API in branch 3.0
### What changes were proposed in this pull request?
Revert https://github.com/apache/spark/pull/28208 and https://github.com/apache/spark/pull/24076 in branch 3.0
### Why are the changes needed?
Unfortunately, the PR https://github.com/apache/spark/pull/28208 is merged after Spark 3.0 RC 2 cut. Although the improvement is great, we can't break the policy to add new improvement commits into branch 3.0 now.
Also, if we are going to adopt the improvement in a future release, we should not release 3.0 with https://github.com/apache/spark/pull/24076, since the API result will be changed.
After discuss with cloud-fan and gatorsmile offline, we think the best choice is to revert both commits and follow community release policy.
### Does this PR introduce _any_ user-facing change?
Yes, let's hold the SQL rest API until next release.
### How was this patch tested?
Jenkins unit tests.
Closes #28588 from gengliangwang/revertSQLRestAPI.
Authored-by: Gengliang Wang <ge...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../spark/sql/execution/ui/SQLAppStatusStore.scala | 4 -
.../spark/sql/execution/ui/SparkPlanGraph.scala | 6 +-
.../status/api/v1/sql/ApiSqlRootResource.scala | 29 ---
.../spark/status/api/v1/sql/SqlResource.scala | 157 ----------------
.../org/apache/spark/status/api/v1/sql/api.scala | 42 -----
.../spark/status/api/v1/sql/SqlResourceSuite.scala | 205 ---------------------
6 files changed, 3 insertions(+), 440 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
index a90f37a..c6e7f39 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
@@ -42,10 +42,6 @@ class SQLAppStatusStore(
store.view(classOf[SQLExecutionUIData]).asScala.toSeq
}
- def executionsList(offset: Int, length: Int): Seq[SQLExecutionUIData] = {
- store.view(classOf[SQLExecutionUIData]).skip(offset).max(length).asScala.toSeq
- }
-
def execution(executionId: Long): Option[SQLExecutionUIData] = {
try {
Some(store.read(classOf[SQLExecutionUIData], executionId))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
index a798fe0..274a5a4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
@@ -153,7 +153,7 @@ object SparkPlanGraph {
* @param name the name of this SparkPlan node
* @param metrics metrics that this SparkPlan node will track
*/
-class SparkPlanGraphNode(
+private[ui] class SparkPlanGraphNode(
val id: Long,
val name: String,
val desc: String,
@@ -193,7 +193,7 @@ class SparkPlanGraphNode(
/**
* Represent a tree of SparkPlan for WholeStageCodegen.
*/
-class SparkPlanGraphCluster(
+private[ui] class SparkPlanGraphCluster(
id: Long,
name: String,
desc: String,
@@ -229,7 +229,7 @@ class SparkPlanGraphCluster(
* Represent an edge in the SparkPlan tree. `fromId` is the child node id, and `toId` is the parent
* node id.
*/
-case class SparkPlanGraphEdge(fromId: Long, toId: Long) {
+private[ui] case class SparkPlanGraphEdge(fromId: Long, toId: Long) {
def makeDotEdge: String = s""" $fromId->$toId;\n"""
}
diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala
deleted file mode 100644
index 5fc7123..0000000
--- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.status.api.v1.sql
-
-import javax.ws.rs.Path
-
-import org.apache.spark.status.api.v1.ApiRequestContext
-
-@Path("/v1")
-private[v1] class ApiSqlRootResource extends ApiRequestContext {
-
- @Path("applications/{appId}/sql")
- def sqlList(): Class[SqlResource] = classOf[SqlResource]
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala
deleted file mode 100644
index c7599f8..0000000
--- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.status.api.v1.sql
-
-import java.util.Date
-import javax.ws.rs._
-import javax.ws.rs.core.MediaType
-
-import scala.util.{Failure, Success, Try}
-
-import org.apache.spark.JobExecutionStatus
-import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphCluster, SparkPlanGraphNode, SQLAppStatusStore, SQLExecutionUIData}
-import org.apache.spark.status.api.v1.{BaseAppResource, NotFoundException}
-
-@Produces(Array(MediaType.APPLICATION_JSON))
-private[v1] class SqlResource extends BaseAppResource {
-
- val WHOLE_STAGE_CODEGEN = "WholeStageCodegen"
-
- @GET
- def sqlList(
- @DefaultValue("true") @QueryParam("details") details: Boolean,
- @DefaultValue("true") @QueryParam("planDescription") planDescription: Boolean,
- @DefaultValue("0") @QueryParam("offset") offset: Int,
- @DefaultValue("20") @QueryParam("length") length: Int): Seq[ExecutionData] = {
- withUI { ui =>
- val sqlStore = new SQLAppStatusStore(ui.store.store)
- sqlStore.executionsList(offset, length).map { exec =>
- val graph = sqlStore.planGraph(exec.executionId)
- prepareExecutionData(exec, graph, details, planDescription)
- }
- }
- }
-
- @GET
- @Path("{executionId:\\d+}")
- def sql(
- @PathParam("executionId") execId: Long,
- @DefaultValue("true") @QueryParam("details") details: Boolean,
- @DefaultValue("true") @QueryParam("planDescription")
- planDescription: Boolean): ExecutionData = {
- withUI { ui =>
- val sqlStore = new SQLAppStatusStore(ui.store.store)
- val graph = sqlStore.planGraph(execId)
- sqlStore
- .execution(execId)
- .map(prepareExecutionData(_, graph, details, planDescription))
- .getOrElse(throw new NotFoundException("unknown query execution id: " + execId))
- }
- }
-
- private def prepareExecutionData(
- exec: SQLExecutionUIData,
- graph: SparkPlanGraph,
- details: Boolean,
- planDescription: Boolean): ExecutionData = {
-
- var running = Seq[Int]()
- var completed = Seq[Int]()
- var failed = Seq[Int]()
-
- exec.jobs.foreach {
- case (id, JobExecutionStatus.RUNNING) =>
- running = running :+ id
- case (id, JobExecutionStatus.SUCCEEDED) =>
- completed = completed :+ id
- case (id, JobExecutionStatus.FAILED) =>
- failed = failed :+ id
- case _ =>
- }
-
- val status = if (exec.jobs.size == completed.size) {
- "COMPLETED"
- } else if (failed.nonEmpty) {
- "FAILED"
- } else {
- "RUNNING"
- }
-
- val duration = exec.completionTime.getOrElse(new Date()).getTime - exec.submissionTime
- val planDetails = if (planDescription) exec.physicalPlanDescription else ""
- val nodes = if (details) printableMetrics(graph.allNodes, exec.metricValues) else Seq.empty
- val edges = if (details) graph.edges else Seq.empty
-
- new ExecutionData(
- exec.executionId,
- status,
- exec.description,
- planDetails,
- new Date(exec.submissionTime),
- duration,
- running,
- completed,
- failed,
- nodes,
- edges)
- }
-
- private def printableMetrics(allNodes: Seq[SparkPlanGraphNode],
- metricValues: Map[Long, String]): Seq[Node] = {
-
- def getMetric(metricValues: Map[Long, String], accumulatorId: Long,
- metricName: String): Option[Metric] = {
-
- metricValues.get(accumulatorId).map( mv => {
- val metricValue = if (mv.startsWith("\n")) mv.substring(1, mv.length) else mv
- Metric(metricName, metricValue)
- })
- }
-
- val nodeIdAndWSCGIdMap = getNodeIdAndWSCGIdMap(allNodes)
- val nodes = allNodes.map { node =>
- val wholeStageCodegenId = nodeIdAndWSCGIdMap.get(node.id).flatten
- val metrics =
- node.metrics.flatMap(m => getMetric(metricValues, m.accumulatorId, m.name.trim))
- Node(nodeId = node.id, nodeName = node.name.trim, wholeStageCodegenId, metrics)
- }
-
- nodes.sortBy(_.nodeId).reverse
- }
-
- private def getNodeIdAndWSCGIdMap(allNodes: Seq[SparkPlanGraphNode]): Map[Long, Option[Long]] = {
- val wscgNodes = allNodes.filter(_.name.trim.startsWith(WHOLE_STAGE_CODEGEN))
- val nodeIdAndWSCGIdMap: Map[Long, Option[Long]] = wscgNodes.flatMap {
- _ match {
- case x: SparkPlanGraphCluster => x.nodes.map(_.id -> getWholeStageCodegenId(x.name.trim))
- case _ => Seq.empty
- }
- }.toMap
-
- nodeIdAndWSCGIdMap
- }
-
- private def getWholeStageCodegenId(wscgNodeName: String): Option[Long] = {
- Try(wscgNodeName.substring(
- s"$WHOLE_STAGE_CODEGEN (".length, wscgNodeName.length - 1).toLong) match {
- case Success(wscgId) => Some(wscgId)
- case Failure(t) => None
- }
- }
-
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala
deleted file mode 100644
index 0ddf667..0000000
--- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.status.api.v1.sql
-
-import java.util.Date
-
-import org.apache.spark.sql.execution.ui.SparkPlanGraphEdge
-
-class ExecutionData private[spark] (
- val id: Long,
- val status: String,
- val description: String,
- val planDescription: String,
- val submissionTime: Date,
- val duration: Long,
- val runningJobIds: Seq[Int],
- val successJobIds: Seq[Int],
- val failedJobIds: Seq[Int],
- val nodes: Seq[Node],
- val edges: Seq[SparkPlanGraphEdge])
-
-case class Node private[spark](
- nodeId: Long,
- nodeName: String,
- wholeStageCodegenId: Option[Long] = None,
- metrics: Seq[Metric])
-
-case class Metric private[spark] (name: String, value: String)
diff --git a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala
deleted file mode 100644
index 43cca24..0000000
--- a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.status.api.v1.sql
-
-import java.util.Date
-
-import scala.collection.mutable.ArrayBuffer
-
-import org.scalatest.PrivateMethodTester
-
-import org.apache.spark.{JobExecutionStatus, SparkFunSuite}
-import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphCluster, SparkPlanGraphEdge, SparkPlanGraphNode, SQLExecutionUIData, SQLPlanMetric}
-
-object SqlResourceSuite {
-
- val SCAN_TEXT = "Scan text"
- val FILTER = "Filter"
- val WHOLE_STAGE_CODEGEN_1 = "WholeStageCodegen (1)"
- val DURATION = "duration"
- val NUMBER_OF_OUTPUT_ROWS = "number of output rows"
- val METADATA_TIME = "metadata time"
- val NUMBER_OF_FILES_READ = "number of files read"
- val SIZE_OF_FILES_READ = "size of files read"
- val PLAN_DESCRIPTION = "== Physical Plan ==\nCollectLimit (3)\n+- * Filter (2)\n +- Scan text..."
- val DESCRIPTION = "csv at MyDataFrames.scala:57"
-
- val nodeIdAndWSCGIdMap: Map[Long, Option[Long]] = Map(1L -> Some(1L))
-
- val filterNode = new SparkPlanGraphNode(1, FILTER, "",
- metrics = Seq(SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 1, "")))
- val nodes: Seq[SparkPlanGraphNode] = Seq(
- new SparkPlanGraphCluster(0, WHOLE_STAGE_CODEGEN_1, "",
- nodes = ArrayBuffer(filterNode),
- metrics = Seq(SQLPlanMetric(DURATION, 0, ""))),
- new SparkPlanGraphNode(2, SCAN_TEXT, "",
- metrics = Seq(
- SQLPlanMetric(METADATA_TIME, 2, ""),
- SQLPlanMetric(NUMBER_OF_FILES_READ, 3, ""),
- SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 4, ""),
- SQLPlanMetric(SIZE_OF_FILES_READ, 5, ""))))
-
- val nodesWhenCodegenIsOff: Seq[SparkPlanGraphNode] =
- SparkPlanGraph(nodes, edges).allNodes.filterNot(_.name == WHOLE_STAGE_CODEGEN_1)
-
- val edges: Seq[SparkPlanGraphEdge] =
- Seq(SparkPlanGraphEdge(3, 2))
-
- val metrics: Seq[SQLPlanMetric] = {
- Seq(SQLPlanMetric(DURATION, 0, ""),
- SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 1, ""),
- SQLPlanMetric(METADATA_TIME, 2, ""),
- SQLPlanMetric(NUMBER_OF_FILES_READ, 3, ""),
- SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 4, ""),
- SQLPlanMetric(SIZE_OF_FILES_READ, 5, ""))
- }
-
- val sqlExecutionUIData: SQLExecutionUIData = {
- def getMetricValues() = {
- Map[Long, String](
- 0L -> "0 ms",
- 1L -> "1",
- 2L -> "2 ms",
- 3L -> "1",
- 4L -> "1",
- 5L -> "330.0 B"
- )
- }
-
- new SQLExecutionUIData(
- executionId = 0,
- description = DESCRIPTION,
- details = "",
- physicalPlanDescription = PLAN_DESCRIPTION,
- metrics = metrics,
- submissionTime = 1586768888233L,
- completionTime = Some(new Date(1586768888999L)),
- jobs = Map[Int, JobExecutionStatus](
- 0 -> JobExecutionStatus.SUCCEEDED,
- 1 -> JobExecutionStatus.SUCCEEDED),
- stages = Set[Int](),
- metricValues = getMetricValues()
- )
- }
-
- private def getNodes(): Seq[Node] = {
- val node = Node(0, WHOLE_STAGE_CODEGEN_1,
- wholeStageCodegenId = None, metrics = Seq(Metric(DURATION, "0 ms")))
- val node2 = Node(1, FILTER,
- wholeStageCodegenId = Some(1), metrics = Seq(Metric(NUMBER_OF_OUTPUT_ROWS, "1")))
- val node3 = Node(2, SCAN_TEXT, wholeStageCodegenId = None,
- metrics = Seq(Metric(METADATA_TIME, "2 ms"),
- Metric(NUMBER_OF_FILES_READ, "1"),
- Metric(NUMBER_OF_OUTPUT_ROWS, "1"),
- Metric(SIZE_OF_FILES_READ, "330.0 B")))
-
- // reverse order because of supporting execution order by aligning with Spark-UI
- Seq(node3, node2, node)
- }
-
- private def getExpectedNodesWhenWholeStageCodegenIsOff(): Seq[Node] = {
- val node = Node(1, FILTER, metrics = Seq(Metric(NUMBER_OF_OUTPUT_ROWS, "1")))
- val node2 = Node(2, SCAN_TEXT,
- metrics = Seq(Metric(METADATA_TIME, "2 ms"),
- Metric(NUMBER_OF_FILES_READ, "1"),
- Metric(NUMBER_OF_OUTPUT_ROWS, "1"),
- Metric(SIZE_OF_FILES_READ, "330.0 B")))
-
- // reverse order because of supporting execution order by aligning with Spark-UI
- Seq(node2, node)
- }
-
- private def verifyExpectedExecutionData(executionData: ExecutionData,
- nodes: Seq[Node],
- edges: Seq[SparkPlanGraphEdge],
- planDescription: String): Unit = {
-
- assert(executionData.id == 0)
- assert(executionData.status == "COMPLETED")
- assert(executionData.description == DESCRIPTION)
- assert(executionData.planDescription == planDescription)
- assert(executionData.submissionTime == new Date(1586768888233L))
- assert(executionData.duration == 766L)
- assert(executionData.successJobIds == Seq[Int](0, 1))
- assert(executionData.runningJobIds == Seq[Int]())
- assert(executionData.failedJobIds == Seq.empty)
- assert(executionData.nodes == nodes)
- assert(executionData.edges == edges)
- }
-
-}
-
-/**
- * Sql Resource Public API Unit Tests.
- */
-class SqlResourceSuite extends SparkFunSuite with PrivateMethodTester {
-
- import SqlResourceSuite._
-
- val sqlResource = new SqlResource()
- val prepareExecutionData = PrivateMethod[ExecutionData]('prepareExecutionData)
-
- test("Prepare ExecutionData when details = false and planDescription = false") {
- val executionData =
- sqlResource invokePrivate prepareExecutionData(
- sqlExecutionUIData, SparkPlanGraph(Seq.empty, Seq.empty), false, false)
- verifyExpectedExecutionData(executionData, edges = Seq.empty,
- nodes = Seq.empty, planDescription = "")
- }
-
- test("Prepare ExecutionData when details = true and planDescription = false") {
- val executionData =
- sqlResource invokePrivate prepareExecutionData(
- sqlExecutionUIData, SparkPlanGraph(nodes, edges), true, false)
- verifyExpectedExecutionData(
- executionData,
- nodes = getNodes(),
- edges,
- planDescription = "")
- }
-
- test("Prepare ExecutionData when details = true and planDescription = true") {
- val executionData =
- sqlResource invokePrivate prepareExecutionData(
- sqlExecutionUIData, SparkPlanGraph(nodes, edges), true, true)
- verifyExpectedExecutionData(
- executionData,
- nodes = getNodes(),
- edges = edges,
- planDescription = PLAN_DESCRIPTION)
- }
-
- test("Prepare ExecutionData when details = true and planDescription = false and WSCG = off") {
- val executionData =
- sqlResource invokePrivate prepareExecutionData(
- sqlExecutionUIData, SparkPlanGraph(nodesWhenCodegenIsOff, edges), true, false)
- verifyExpectedExecutionData(
- executionData,
- nodes = getExpectedNodesWhenWholeStageCodegenIsOff(),
- edges = edges,
- planDescription = "")
- }
-
- test("Parse wholeStageCodegenId from nodeName") {
- val getWholeStageCodegenId = PrivateMethod[Option[Long]]('getWholeStageCodegenId)
- val wholeStageCodegenId =
- sqlResource invokePrivate getWholeStageCodegenId(WHOLE_STAGE_CODEGEN_1)
- assert(wholeStageCodegenId == Some(1))
- }
-
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org