You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2016/01/25 21:47:42 UTC

spark git commit: [SPARK-12902] [SQL] visualization for generated operators

Repository: spark
Updated Branches:
  refs/heads/master c037d2548 -> 7d877c343


[SPARK-12902] [SQL] visualization for generated operators

This PR brings back visualization for generated operators, they looks like:

![sql](https://cloud.githubusercontent.com/assets/40902/12460920/0dc7956a-bf6b-11e5-9c3f-8389f452526e.png)

![stage](https://cloud.githubusercontent.com/assets/40902/12460923/11806ac4-bf6b-11e5-9c72-e84a62c5ea93.png)

Note: SQL metrics are not supported right now, because they are very slow, will be supported once we have batch mode.

Author: Davies Liu <da...@databricks.com>

Closes #10828 from davies/viz_codegen.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7d877c34
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7d877c34
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7d877c34

Branch: refs/heads/master
Commit: 7d877c3439d872ec2a9e07d245e9c96174c0cf00
Parents: c037d25
Author: Davies Liu <da...@databricks.com>
Authored: Mon Jan 25 12:44:20 2016 -0800
Committer: Davies Liu <da...@gmail.com>
Committed: Mon Jan 25 12:44:20 2016 -0800

----------------------------------------------------------------------
 .../org/apache/spark/ui/static/spark-dag-viz.js |  2 +-
 .../spark/ui/scope/RDDOperationGraph.scala      |  6 +-
 .../sql/execution/ui/static/spark-sql-viz.css   |  6 ++
 .../spark/sql/execution/SparkPlanInfo.scala     |  9 +-
 .../spark/sql/execution/ui/ExecutionPage.scala  |  4 +-
 .../spark/sql/execution/ui/SQLListener.scala    |  2 +-
 .../spark/sql/execution/ui/SparkPlanGraph.scala | 95 +++++++++++++++-----
 .../sql/execution/metric/SQLMetricsSuite.scala  | 10 ++-
 .../sql/execution/ui/SQLListenerSuite.scala     |  2 +-
 9 files changed, 104 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7d877c34/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
----------------------------------------------------------------------
diff --git a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
index 83dbea4..4337c42 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
@@ -284,7 +284,7 @@ function renderDot(dot, container, forJob) {
   renderer(container, g);
 
   // Find the stage cluster and mark it for styling and post-processing
-  container.selectAll("g.cluster[name*=\"Stage\"]").classed("stage", true);
+  container.selectAll("g.cluster[name^=\"Stage \"]").classed("stage", true);
 }
 
 /* -------------------- *

http://git-wip-us.apache.org/repos/asf/spark/blob/7d877c34/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
index 06da74f..003c218 100644
--- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
+++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
@@ -130,7 +130,11 @@ private[ui] object RDDOperationGraph extends Logging {
           }
         }
         // Attach the outermost cluster to the root cluster, and the RDD to the innermost cluster
-        rddClusters.headOption.foreach { cluster => rootCluster.attachChildCluster(cluster) }
+        rddClusters.headOption.foreach { cluster =>
+          if (!rootCluster.childClusters.contains(cluster)) {
+            rootCluster.attachChildCluster(cluster)
+          }
+        }
         rddClusters.lastOption.foreach { cluster => cluster.attachChildNode(node) }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/7d877c34/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css
----------------------------------------------------------------------
diff --git a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css
index ddd3a91..303f8eb 100644
--- a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css
+++ b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css
@@ -20,6 +20,12 @@
   text-shadow: none;
 }
 
+#plan-viz-graph svg g.cluster rect {
+  fill: #A0DFFF;
+  stroke: #3EC0FF;
+  stroke-width: 1px;
+}
+
 #plan-viz-graph svg g.node rect {
   fill: #C3EBFF;
   stroke: #3EC0FF;

http://git-wip-us.apache.org/repos/asf/spark/blob/7d877c34/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
index 4f750ad..4dd9928 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
@@ -36,12 +36,17 @@ class SparkPlanInfo(
 private[sql] object SparkPlanInfo {
 
   def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
+    val children = plan match {
+      case WholeStageCodegen(child, _) => child :: Nil
+      case InputAdapter(child) => child :: Nil
+      case plan => plan.children
+    }
     val metrics = plan.metrics.toSeq.map { case (key, metric) =>
       new SQLMetricInfo(metric.name.getOrElse(key), metric.id,
         Utils.getFormattedClassName(metric.param))
     }
-    val children = plan.children.map(fromSparkPlan)
 
-    new SparkPlanInfo(plan.nodeName, plan.simpleString, children, plan.metadata, metrics)
+    new SparkPlanInfo(plan.nodeName, plan.simpleString, children.map(fromSparkPlan),
+      plan.metadata, metrics)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7d877c34/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
index c74ad40..49915ad 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
@@ -99,7 +99,7 @@ private[sql] class ExecutionPage(parent: SQLTab) extends WebUIPage("execution")
   }
 
   private def planVisualization(metrics: Map[Long, String], graph: SparkPlanGraph): Seq[Node] = {
-    val metadata = graph.nodes.flatMap { node =>
+    val metadata = graph.allNodes.flatMap { node =>
       val nodeId = s"plan-meta-data-${node.id}"
       <div id={nodeId}>{node.desc}</div>
     }
@@ -110,7 +110,7 @@ private[sql] class ExecutionPage(parent: SQLTab) extends WebUIPage("execution")
         <div class="dot-file">
           {graph.makeDotFile(metrics)}
         </div>
-        <div id="plan-viz-metadata-size">{graph.nodes.size.toString}</div>
+        <div id="plan-viz-metadata-size">{graph.allNodes.size.toString}</div>
         {metadata}
       </div>
       {planVisualizationResources}

http://git-wip-us.apache.org/repos/asf/spark/blob/7d877c34/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index cd56136..83c64f7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -219,7 +219,7 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
     case SparkListenerSQLExecutionStart(executionId, description, details,
       physicalPlanDescription, sparkPlanInfo, time) =>
       val physicalPlanGraph = SparkPlanGraph(sparkPlanInfo)
-      val sqlPlanMetrics = physicalPlanGraph.nodes.flatMap { node =>
+      val sqlPlanMetrics = physicalPlanGraph.allNodes.flatMap { node =>
         node.metrics.map(metric => metric.accumulatorId -> metric)
       }
       val executionUIData = new SQLExecutionUIData(

http://git-wip-us.apache.org/repos/asf/spark/blob/7d877c34/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
index 3a6eff9..4eb2485 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.mutable
 
-import org.apache.spark.sql.execution.SparkPlanInfo
+import org.apache.spark.sql.execution.{InputAdapter, SparkPlanInfo, WholeStageCodegen}
 import org.apache.spark.sql.execution.metric.SQLMetrics
 
 /**
@@ -41,6 +41,16 @@ private[ui] case class SparkPlanGraph(
     dotFile.append("}")
     dotFile.toString()
   }
+
+  /**
+    * All the SparkPlanGraphNodes, including those inside of WholeStageCodegen.
+    */
+  val allNodes: Seq[SparkPlanGraphNode] = {
+    nodes.flatMap {
+      case cluster: SparkPlanGraphCluster => cluster.nodes :+ cluster
+      case node => Seq(node)
+    }
+  }
 }
 
 private[sql] object SparkPlanGraph {
@@ -52,7 +62,7 @@ private[sql] object SparkPlanGraph {
     val nodeIdGenerator = new AtomicLong(0)
     val nodes = mutable.ArrayBuffer[SparkPlanGraphNode]()
     val edges = mutable.ArrayBuffer[SparkPlanGraphEdge]()
-    buildSparkPlanGraphNode(planInfo, nodeIdGenerator, nodes, edges)
+    buildSparkPlanGraphNode(planInfo, nodeIdGenerator, nodes, edges, null, null)
     new SparkPlanGraph(nodes, edges)
   }
 
@@ -60,22 +70,40 @@ private[sql] object SparkPlanGraph {
       planInfo: SparkPlanInfo,
       nodeIdGenerator: AtomicLong,
       nodes: mutable.ArrayBuffer[SparkPlanGraphNode],
-      edges: mutable.ArrayBuffer[SparkPlanGraphEdge]): SparkPlanGraphNode = {
-    val metrics = planInfo.metrics.map { metric =>
-      SQLPlanMetric(metric.name, metric.accumulatorId,
-        SQLMetrics.getMetricParam(metric.metricParam))
+      edges: mutable.ArrayBuffer[SparkPlanGraphEdge],
+      parent: SparkPlanGraphNode,
+      subgraph: SparkPlanGraphCluster): Unit = {
+    if (planInfo.nodeName == classOf[WholeStageCodegen].getSimpleName) {
+      val cluster = new SparkPlanGraphCluster(
+        nodeIdGenerator.getAndIncrement(),
+        planInfo.nodeName,
+        planInfo.simpleString,
+        mutable.ArrayBuffer[SparkPlanGraphNode]())
+      nodes += cluster
+      buildSparkPlanGraphNode(
+        planInfo.children.head, nodeIdGenerator, nodes, edges, parent, cluster)
+    } else if (planInfo.nodeName == classOf[InputAdapter].getSimpleName) {
+      buildSparkPlanGraphNode(planInfo.children.head, nodeIdGenerator, nodes, edges, parent, null)
+    } else {
+      val metrics = planInfo.metrics.map { metric =>
+        SQLPlanMetric(metric.name, metric.accumulatorId,
+          SQLMetrics.getMetricParam(metric.metricParam))
+      }
+      val node = new SparkPlanGraphNode(
+        nodeIdGenerator.getAndIncrement(), planInfo.nodeName,
+        planInfo.simpleString, planInfo.metadata, metrics)
+      if (subgraph == null) {
+        nodes += node
+      } else {
+        subgraph.nodes += node
+      }
+
+      if (parent != null) {
+        edges += SparkPlanGraphEdge(node.id, parent.id)
+      }
+      planInfo.children.foreach(
+        buildSparkPlanGraphNode(_, nodeIdGenerator, nodes, edges, node, subgraph))
     }
-    val node = SparkPlanGraphNode(
-      nodeIdGenerator.getAndIncrement(), planInfo.nodeName,
-      planInfo.simpleString, planInfo.metadata, metrics)
-
-    nodes += node
-    val childrenNodes = planInfo.children.map(
-      child => buildSparkPlanGraphNode(child, nodeIdGenerator, nodes, edges))
-    for (child <- childrenNodes) {
-      edges += SparkPlanGraphEdge(child.id, node.id)
-    }
-    node
   }
 }
 
@@ -86,12 +114,12 @@ private[sql] object SparkPlanGraph {
  * @param name the name of this SparkPlan node
  * @param metrics metrics that this SparkPlan node will track
  */
-private[ui] case class SparkPlanGraphNode(
-    id: Long,
-    name: String,
-    desc: String,
-    metadata: Map[String, String],
-    metrics: Seq[SQLPlanMetric]) {
+private[ui] class SparkPlanGraphNode(
+    val id: Long,
+    val name: String,
+    val desc: String,
+    val metadata: Map[String, String],
+    val metrics: Seq[SQLPlanMetric]) {
 
   def makeDotNode(metricsValue: Map[Long, String]): String = {
     val builder = new mutable.StringBuilder(name)
@@ -118,6 +146,27 @@ private[ui] case class SparkPlanGraphNode(
 }
 
 /**
+  * Represent a tree of SparkPlan for WholeStageCodegen.
+  */
+private[ui] class SparkPlanGraphCluster(
+    id: Long,
+    name: String,
+    desc: String,
+    val nodes: mutable.ArrayBuffer[SparkPlanGraphNode])
+  extends SparkPlanGraphNode(id, name, desc, Map.empty, Nil) {
+
+  override def makeDotNode(metricsValue: Map[Long, String]): String = {
+    s"""
+       |  subgraph cluster${id} {
+       |    label=${name};
+       |    ${nodes.map(_.makeDotNode(metricsValue)).mkString("    \n")}
+       |  }
+     """.stripMargin
+  }
+}
+
+
+/**
  * Represent an edge in the SparkPlan tree. `fromId` is the parent node id, and `toId` is the child
  * node id.
  */

http://git-wip-us.apache.org/repos/asf/spark/blob/7d877c34/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 5128543..cbae19e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -86,7 +86,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
       // If we can track all jobs, check the metric values
       val metricValues = sqlContext.listener.getExecutionMetrics(executionId)
       val actualMetrics = SparkPlanGraph(SparkPlanInfo.fromSparkPlan(
-        df.queryExecution.executedPlan)).nodes.filter { node =>
+        df.queryExecution.executedPlan)).allNodes.filter { node =>
         expectedMetrics.contains(node.id)
       }.map { node =>
         val nodeMetrics = node.metrics.map { metric =>
@@ -134,6 +134,14 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
     )
   }
 
+  test("WholeStageCodegen metrics") {
+    // Assume the execution plan is
+    // WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Filter(nodeId = 1))
+    // TODO: update metrics in generated operators
+    val df = sqlContext.range(10).filter('id < 5)
+    testSparkPlanMetrics(df, 1, Map.empty)
+  }
+
   test("TungstenAggregate metrics") {
     // Assume the execution plan is
     // ... -> TungstenAggregate(nodeId = 2) -> Exchange(nodeId = 1)

http://git-wip-us.apache.org/repos/asf/spark/blob/7d877c34/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index eef3c1f..81a159d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -83,7 +83,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
     val df = createTestDataFrame
     val accumulatorIds =
       SparkPlanGraph(SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan))
-        .nodes.flatMap(_.metrics.map(_.accumulatorId))
+        .allNodes.flatMap(_.metrics.map(_.accumulatorId))
     // Assume all accumulators are long
     var accumulatorValue = 0L
     val accumulatorUpdates = accumulatorIds.map { id =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org