You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2020/05/19 06:25:02 UTC

[spark] branch branch-3.0 updated: [SPARK-31440][SQL] Improve SQL Rest API

This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new d668d67  [SPARK-31440][SQL] Improve SQL Rest API
d668d67 is described below

commit d668d679e93dc4f84bdce15f9f6bdb19552496f1
Author: Eren Avsarogullari <er...@gmail.com>
AuthorDate: Mon May 18 23:21:32 2020 -0700

    [SPARK-31440][SQL] Improve SQL Rest API
    
    ### What changes were proposed in this pull request?
    SQL Rest API exposes query execution metrics as Public API. This PR aims to apply following improvements on SQL Rest API by aligning Spark-UI.
    
    **Proposed Improvements:**
    1- Support Physical Operations and group metrics per physical operation by aligning Spark UI.
    2- Support `wholeStageCodegenId` for Physical Operations
    3- `nodeId` can be useful for grouping metrics and sorting physical operations (according to execution order) to differentiate same operators (if used multiple times during the same query execution) and their metrics.
    4- Filter `empty` metrics by aligning with Spark UI - SQL Tab. Currently, Spark UI does not show empty metrics.
    5- Remove line breakers(`\n`) from `metricValue`.
    6- `planDescription` can be `optional` Http parameter to avoid network cost where there is specially complex jobs creating big-plans.
    7- `metrics` attribute needs to be exposed at the bottom order as `nodes`. Specially, this can be useful for the user where `nodes` array size is high.
    8- `edges` attribute is being exposed to show relationship between `nodes`.
    9- Reverse order on `metricDetails` aims to match with Spark UI by supporting Physical Operators' execution order.
    
    ### Why are the changes needed?
    Proposed improvements provides more useful (e.g: physical operations and metrics correlation, grouping) and clear (e.g: filtering blank metrics, removing line breakers) result for the end-user.
    
    ### Does this PR introduce any user-facing change?
    Yes. Please find both current and improved versions of the results as attached for following SQL Rest Endpoint:
    ```
    curl -X GET http://localhost:4040/api/v1/applications/$appId/sql/$executionId?details=true
    ```
    **Current version:**
    https://issues.apache.org/jira/secure/attachment/12999821/current_version.json
    
    **Improved version:**
    https://issues.apache.org/jira/secure/attachment/13000621/improved_version.json
    
    ### Backward Compatibility
    SQL Rest API will be started to expose with `Spark 3.0` and `3.0.0-preview2` (released on 12/23/19) does not cover this API so if PR can catch 3.0 release, this will not have any backward compatibility issue.
    
    ### How was this patch tested?
    1. New Unit tests are added.
    2. Also, patch has been tested manually through both **Spark Core** and **History Server** Rest APIs.
    
    Closes #28208 from erenavsarogullari/SPARK-31440.
    
    Authored-by: Eren Avsarogullari <er...@gmail.com>
    Signed-off-by: Gengliang Wang <ge...@databricks.com>
    (cherry picked from commit ab4cf49a1ca5a2578ecd0441ad6db8e88593e648)
    Signed-off-by: Gengliang Wang <ge...@databricks.com>
---
 .../spark/sql/execution/ui/SparkPlanGraph.scala    |   6 +-
 .../spark/status/api/v1/sql/SqlResource.scala      |  90 +++++++--
 .../org/apache/spark/status/api/v1/sql/api.scala   |  15 +-
 .../spark/status/api/v1/sql/SqlResourceSuite.scala | 205 +++++++++++++++++++++
 4 files changed, 293 insertions(+), 23 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
index 274a5a4..a798fe0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
@@ -153,7 +153,7 @@ object SparkPlanGraph {
  * @param name the name of this SparkPlan node
  * @param metrics metrics that this SparkPlan node will track
  */
-private[ui] class SparkPlanGraphNode(
+class SparkPlanGraphNode(
     val id: Long,
     val name: String,
     val desc: String,
@@ -193,7 +193,7 @@ private[ui] class SparkPlanGraphNode(
 /**
  * Represent a tree of SparkPlan for WholeStageCodegen.
  */
-private[ui] class SparkPlanGraphCluster(
+class SparkPlanGraphCluster(
     id: Long,
     name: String,
     desc: String,
@@ -229,7 +229,7 @@ private[ui] class SparkPlanGraphCluster(
  * Represent an edge in the SparkPlan tree. `fromId` is the child node id, and `toId` is the parent
  * node id.
  */
-private[ui] case class SparkPlanGraphEdge(fromId: Long, toId: Long) {
+case class SparkPlanGraphEdge(fromId: Long, toId: Long) {
 
   def makeDotEdge: String = s"""  $fromId->$toId;\n"""
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala
index 346e07f..c7599f8 100644
--- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala
@@ -21,21 +21,29 @@ import java.util.Date
 import javax.ws.rs._
 import javax.ws.rs.core.MediaType
 
+import scala.util.{Failure, Success, Try}
+
 import org.apache.spark.JobExecutionStatus
-import org.apache.spark.sql.execution.ui.{SQLAppStatusStore, SQLExecutionUIData, SQLPlanMetric}
+import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphCluster, SparkPlanGraphNode, SQLAppStatusStore, SQLExecutionUIData}
 import org.apache.spark.status.api.v1.{BaseAppResource, NotFoundException}
 
 @Produces(Array(MediaType.APPLICATION_JSON))
 private[v1] class SqlResource extends BaseAppResource {
 
+  val WHOLE_STAGE_CODEGEN = "WholeStageCodegen"
+
   @GET
   def sqlList(
-      @DefaultValue("false") @QueryParam("details") details: Boolean,
+      @DefaultValue("true") @QueryParam("details") details: Boolean,
+      @DefaultValue("true") @QueryParam("planDescription") planDescription: Boolean,
       @DefaultValue("0") @QueryParam("offset") offset: Int,
       @DefaultValue("20") @QueryParam("length") length: Int): Seq[ExecutionData] = {
     withUI { ui =>
       val sqlStore = new SQLAppStatusStore(ui.store.store)
-      sqlStore.executionsList(offset, length).map(prepareExecutionData(_, details))
+      sqlStore.executionsList(offset, length).map { exec =>
+        val graph = sqlStore.planGraph(exec.executionId)
+        prepareExecutionData(exec, graph, details, planDescription)
+      }
     }
   }
 
@@ -43,24 +51,25 @@ private[v1] class SqlResource extends BaseAppResource {
   @Path("{executionId:\\d+}")
   def sql(
       @PathParam("executionId") execId: Long,
-      @DefaultValue("false") @QueryParam("details") details: Boolean): ExecutionData = {
+      @DefaultValue("true") @QueryParam("details") details: Boolean,
+      @DefaultValue("true") @QueryParam("planDescription")
+      planDescription: Boolean): ExecutionData = {
     withUI { ui =>
       val sqlStore = new SQLAppStatusStore(ui.store.store)
+      val graph = sqlStore.planGraph(execId)
       sqlStore
         .execution(execId)
-        .map(prepareExecutionData(_, details))
-        .getOrElse(throw new NotFoundException("unknown id: " + execId))
+        .map(prepareExecutionData(_, graph, details, planDescription))
+        .getOrElse(throw new NotFoundException("unknown query execution id: " + execId))
     }
   }
 
-  private def printableMetrics(
-      metrics: Seq[SQLPlanMetric],
-      metricValues: Map[Long, String]): Seq[Metrics] = {
-    metrics.map(metric =>
-      Metrics(metric.name, metricValues.get(metric.accumulatorId).getOrElse("")))
-  }
+  private def prepareExecutionData(
+    exec: SQLExecutionUIData,
+    graph: SparkPlanGraph,
+    details: Boolean,
+    planDescription: Boolean): ExecutionData = {
 
-  private def prepareExecutionData(exec: SQLExecutionUIData, details: Boolean): ExecutionData = {
     var running = Seq[Int]()
     var completed = Seq[Int]()
     var failed = Seq[Int]()
@@ -84,18 +93,65 @@ private[v1] class SqlResource extends BaseAppResource {
     }
 
     val duration = exec.completionTime.getOrElse(new Date()).getTime - exec.submissionTime
-    val planDetails = if (details) exec.physicalPlanDescription else ""
-    val metrics = if (details) printableMetrics(exec.metrics, exec.metricValues) else Seq.empty
+    val planDetails = if (planDescription) exec.physicalPlanDescription else ""
+    val nodes = if (details) printableMetrics(graph.allNodes, exec.metricValues) else Seq.empty
+    val edges = if (details) graph.edges else Seq.empty
+
     new ExecutionData(
       exec.executionId,
       status,
       exec.description,
       planDetails,
-      metrics,
       new Date(exec.submissionTime),
       duration,
       running,
       completed,
-      failed)
+      failed,
+      nodes,
+      edges)
   }
+
+  private def printableMetrics(allNodes: Seq[SparkPlanGraphNode],
+    metricValues: Map[Long, String]): Seq[Node] = {
+
+    def getMetric(metricValues: Map[Long, String], accumulatorId: Long,
+      metricName: String): Option[Metric] = {
+
+      metricValues.get(accumulatorId).map( mv => {
+        val metricValue = if (mv.startsWith("\n")) mv.substring(1, mv.length) else mv
+        Metric(metricName, metricValue)
+      })
+    }
+
+    val nodeIdAndWSCGIdMap = getNodeIdAndWSCGIdMap(allNodes)
+    val nodes = allNodes.map { node =>
+      val wholeStageCodegenId = nodeIdAndWSCGIdMap.get(node.id).flatten
+      val metrics =
+        node.metrics.flatMap(m => getMetric(metricValues, m.accumulatorId, m.name.trim))
+      Node(nodeId = node.id, nodeName = node.name.trim, wholeStageCodegenId, metrics)
+    }
+
+    nodes.sortBy(_.nodeId).reverse
+  }
+
+  private def getNodeIdAndWSCGIdMap(allNodes: Seq[SparkPlanGraphNode]): Map[Long, Option[Long]] = {
+    val wscgNodes = allNodes.filter(_.name.trim.startsWith(WHOLE_STAGE_CODEGEN))
+    val nodeIdAndWSCGIdMap: Map[Long, Option[Long]] = wscgNodes.flatMap {
+      _ match {
+        case x: SparkPlanGraphCluster => x.nodes.map(_.id -> getWholeStageCodegenId(x.name.trim))
+        case _ => Seq.empty
+      }
+    }.toMap
+
+    nodeIdAndWSCGIdMap
+  }
+
+  private def getWholeStageCodegenId(wscgNodeName: String): Option[Long] = {
+    Try(wscgNodeName.substring(
+      s"$WHOLE_STAGE_CODEGEN (".length, wscgNodeName.length - 1).toLong) match {
+      case Success(wscgId) => Some(wscgId)
+      case Failure(t) => None
+    }
+  }
+
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala
index 7ace66f..0ddf667 100644
--- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala
+++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala
@@ -18,16 +18,25 @@ package org.apache.spark.status.api.v1.sql
 
 import java.util.Date
 
+import org.apache.spark.sql.execution.ui.SparkPlanGraphEdge
+
 class ExecutionData private[spark] (
     val id: Long,
     val status: String,
     val description: String,
     val planDescription: String,
-    val metrics: Seq[Metrics],
     val submissionTime: Date,
     val duration: Long,
     val runningJobIds: Seq[Int],
     val successJobIds: Seq[Int],
-    val failedJobIds: Seq[Int])
+    val failedJobIds: Seq[Int],
+    val nodes: Seq[Node],
+    val edges: Seq[SparkPlanGraphEdge])
+
+case class Node private[spark](
+    nodeId: Long,
+    nodeName: String,
+    wholeStageCodegenId: Option[Long] = None,
+    metrics: Seq[Metric])
 
-case class Metrics private[spark] (metricName: String, metricValue: String)
+case class Metric private[spark] (name: String, value: String)
diff --git a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala
new file mode 100644
index 0000000..43cca24
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.api.v1.sql
+
+import java.util.Date
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.scalatest.PrivateMethodTester
+
+import org.apache.spark.{JobExecutionStatus, SparkFunSuite}
+import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphCluster, SparkPlanGraphEdge, SparkPlanGraphNode, SQLExecutionUIData, SQLPlanMetric}
+
+object SqlResourceSuite {
+
+  val SCAN_TEXT = "Scan text"
+  val FILTER = "Filter"
+  val WHOLE_STAGE_CODEGEN_1 = "WholeStageCodegen (1)"
+  val DURATION = "duration"
+  val NUMBER_OF_OUTPUT_ROWS = "number of output rows"
+  val METADATA_TIME = "metadata time"
+  val NUMBER_OF_FILES_READ = "number of files read"
+  val SIZE_OF_FILES_READ = "size of files read"
+  val PLAN_DESCRIPTION = "== Physical Plan ==\nCollectLimit (3)\n+- * Filter (2)\n +- Scan text..."
+  val DESCRIPTION = "csv at MyDataFrames.scala:57"
+
+  val nodeIdAndWSCGIdMap: Map[Long, Option[Long]] = Map(1L -> Some(1L))
+
+  val filterNode = new SparkPlanGraphNode(1, FILTER, "",
+    metrics = Seq(SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 1, "")))
+  val nodes: Seq[SparkPlanGraphNode] = Seq(
+    new SparkPlanGraphCluster(0, WHOLE_STAGE_CODEGEN_1, "",
+      nodes = ArrayBuffer(filterNode),
+      metrics = Seq(SQLPlanMetric(DURATION, 0, ""))),
+    new SparkPlanGraphNode(2, SCAN_TEXT, "",
+      metrics = Seq(
+      SQLPlanMetric(METADATA_TIME, 2, ""),
+      SQLPlanMetric(NUMBER_OF_FILES_READ, 3, ""),
+      SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 4, ""),
+      SQLPlanMetric(SIZE_OF_FILES_READ, 5, ""))))
+
+  val nodesWhenCodegenIsOff: Seq[SparkPlanGraphNode] =
+    SparkPlanGraph(nodes, edges).allNodes.filterNot(_.name == WHOLE_STAGE_CODEGEN_1)
+
+  val edges: Seq[SparkPlanGraphEdge] =
+    Seq(SparkPlanGraphEdge(3, 2))
+
+  val metrics: Seq[SQLPlanMetric] = {
+    Seq(SQLPlanMetric(DURATION, 0, ""),
+      SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 1, ""),
+      SQLPlanMetric(METADATA_TIME, 2, ""),
+      SQLPlanMetric(NUMBER_OF_FILES_READ, 3, ""),
+      SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 4, ""),
+      SQLPlanMetric(SIZE_OF_FILES_READ, 5, ""))
+  }
+
+  val sqlExecutionUIData: SQLExecutionUIData = {
+    def getMetricValues() = {
+      Map[Long, String](
+        0L -> "0 ms",
+        1L -> "1",
+        2L -> "2 ms",
+        3L -> "1",
+        4L -> "1",
+        5L -> "330.0 B"
+      )
+    }
+
+    new SQLExecutionUIData(
+      executionId = 0,
+      description = DESCRIPTION,
+      details = "",
+      physicalPlanDescription = PLAN_DESCRIPTION,
+      metrics = metrics,
+      submissionTime = 1586768888233L,
+      completionTime = Some(new Date(1586768888999L)),
+      jobs = Map[Int, JobExecutionStatus](
+        0 -> JobExecutionStatus.SUCCEEDED,
+        1 -> JobExecutionStatus.SUCCEEDED),
+      stages = Set[Int](),
+      metricValues = getMetricValues()
+    )
+  }
+
+  private def getNodes(): Seq[Node] = {
+    val node = Node(0, WHOLE_STAGE_CODEGEN_1,
+      wholeStageCodegenId = None, metrics = Seq(Metric(DURATION, "0 ms")))
+    val node2 = Node(1, FILTER,
+      wholeStageCodegenId = Some(1), metrics = Seq(Metric(NUMBER_OF_OUTPUT_ROWS, "1")))
+    val node3 = Node(2, SCAN_TEXT, wholeStageCodegenId = None,
+      metrics = Seq(Metric(METADATA_TIME, "2 ms"),
+        Metric(NUMBER_OF_FILES_READ, "1"),
+        Metric(NUMBER_OF_OUTPUT_ROWS, "1"),
+        Metric(SIZE_OF_FILES_READ, "330.0 B")))
+
+    // reverse order because of supporting execution order by aligning with Spark-UI
+    Seq(node3, node2, node)
+  }
+
+  private def getExpectedNodesWhenWholeStageCodegenIsOff(): Seq[Node] = {
+    val node = Node(1, FILTER, metrics = Seq(Metric(NUMBER_OF_OUTPUT_ROWS, "1")))
+    val node2 = Node(2, SCAN_TEXT,
+      metrics = Seq(Metric(METADATA_TIME, "2 ms"),
+        Metric(NUMBER_OF_FILES_READ, "1"),
+        Metric(NUMBER_OF_OUTPUT_ROWS, "1"),
+        Metric(SIZE_OF_FILES_READ, "330.0 B")))
+
+    // reverse order because of supporting execution order by aligning with Spark-UI
+    Seq(node2, node)
+  }
+
+  private def verifyExpectedExecutionData(executionData: ExecutionData,
+    nodes: Seq[Node],
+    edges: Seq[SparkPlanGraphEdge],
+    planDescription: String): Unit = {
+
+    assert(executionData.id == 0)
+    assert(executionData.status == "COMPLETED")
+    assert(executionData.description == DESCRIPTION)
+    assert(executionData.planDescription == planDescription)
+    assert(executionData.submissionTime == new Date(1586768888233L))
+    assert(executionData.duration == 766L)
+    assert(executionData.successJobIds == Seq[Int](0, 1))
+    assert(executionData.runningJobIds == Seq[Int]())
+    assert(executionData.failedJobIds == Seq.empty)
+    assert(executionData.nodes == nodes)
+    assert(executionData.edges == edges)
+  }
+
+}
+
+/**
+ * Sql Resource Public API Unit Tests.
+ */
+class SqlResourceSuite extends SparkFunSuite with PrivateMethodTester {
+
+  import SqlResourceSuite._
+
+  val sqlResource = new SqlResource()
+  val prepareExecutionData = PrivateMethod[ExecutionData]('prepareExecutionData)
+
+  test("Prepare ExecutionData when details = false and planDescription = false") {
+    val executionData =
+      sqlResource invokePrivate prepareExecutionData(
+        sqlExecutionUIData, SparkPlanGraph(Seq.empty, Seq.empty), false, false)
+    verifyExpectedExecutionData(executionData, edges = Seq.empty,
+      nodes = Seq.empty, planDescription = "")
+  }
+
+  test("Prepare ExecutionData when details = true and planDescription = false") {
+    val executionData =
+      sqlResource invokePrivate prepareExecutionData(
+        sqlExecutionUIData, SparkPlanGraph(nodes, edges), true, false)
+    verifyExpectedExecutionData(
+      executionData,
+      nodes = getNodes(),
+      edges,
+      planDescription = "")
+  }
+
+  test("Prepare ExecutionData when details = true and planDescription = true") {
+    val executionData =
+      sqlResource invokePrivate prepareExecutionData(
+        sqlExecutionUIData, SparkPlanGraph(nodes, edges), true, true)
+    verifyExpectedExecutionData(
+      executionData,
+      nodes = getNodes(),
+      edges = edges,
+      planDescription = PLAN_DESCRIPTION)
+  }
+
+  test("Prepare ExecutionData when details = true and planDescription = false and WSCG = off") {
+    val executionData =
+      sqlResource invokePrivate prepareExecutionData(
+        sqlExecutionUIData, SparkPlanGraph(nodesWhenCodegenIsOff, edges), true, false)
+    verifyExpectedExecutionData(
+      executionData,
+      nodes = getExpectedNodesWhenWholeStageCodegenIsOff(),
+      edges = edges,
+      planDescription = "")
+  }
+
+  test("Parse wholeStageCodegenId from nodeName") {
+    val getWholeStageCodegenId = PrivateMethod[Option[Long]]('getWholeStageCodegenId)
+    val wholeStageCodegenId =
+      sqlResource invokePrivate getWholeStageCodegenId(WHOLE_STAGE_CODEGEN_1)
+    assert(wholeStageCodegenId == Some(1))
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org