You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/12/09 16:30:56 UTC

spark git commit: [SPARK-12012][SQL] Show more comprehensive PhysicalRDD metadata when visualizing SQL query plan

Repository: spark
Updated Branches:
  refs/heads/master a11321686 -> 6e1c55eac


[SPARK-12012][SQL] Show more comprehensive PhysicalRDD metadata when visualizing SQL query plan

This PR adds a `private[sql]` method `metadata` to `SparkPlan`, which can be used to describe detail information about a physical plan during visualization. Specifically, this PR uses this method to provide details of `PhysicalRDD`s translated from a data source relation. For example, a `ParquetRelation` converted from Hive metastore table `default.psrc` is now shown as the following screenshot:

![image](https://cloud.githubusercontent.com/assets/230655/11526657/e10cb7e6-9916-11e5-9afa-f108932ec890.png)

And here is the screenshot for a regular `ParquetRelation` (not converted from Hive metastore table) loaded from a really long path:

![output](https://cloud.githubusercontent.com/assets/230655/11680582/37c66460-9e94-11e5-8f50-842db5309d5a.png)

Author: Cheng Lian <li...@databricks.com>

Closes #10004 from liancheng/spark-12012.physical-rdd-metadata.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6e1c55ea
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6e1c55ea
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6e1c55ea

Branch: refs/heads/master
Commit: 6e1c55eac4849669e119ce0d51f6d051830deb9f
Parents: a113216
Author: Cheng Lian <li...@databricks.com>
Authored: Wed Dec 9 23:30:42 2015 +0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Dec 9 23:30:42 2015 +0800

----------------------------------------------------------------------
 python/pyspark/sql/dataframe.py                 |  2 +-
 .../spark/sql/execution/ExistingRDD.scala       | 19 ++++++---
 .../apache/spark/sql/execution/SparkPlan.scala  |  5 +++
 .../spark/sql/execution/SparkPlanInfo.scala     |  3 +-
 .../spark/sql/execution/SparkStrategies.scala   |  2 +-
 .../datasources/DataSourceStrategy.scala        | 22 ++++++++--
 .../datasources/parquet/ParquetRelation.scala   | 10 +++++
 .../spark/sql/execution/ui/SparkPlanGraph.scala | 45 ++++++++++++--------
 .../apache/spark/sql/sources/interfaces.scala   |  2 +-
 .../spark/sql/execution/PlannerSuite.scala      |  2 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  7 ++-
 11 files changed, 87 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6e1c55ea/python/pyspark/sql/dataframe.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 746bb55..78ab475 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -213,7 +213,7 @@ class DataFrame(object):
 
         >>> df.explain()
         == Physical Plan ==
-        Scan PhysicalRDD[age#0,name#1]
+        Scan ExistingRDD[age#0,name#1]
 
         >>> df.explain(True)
         == Parsed Logical Plan ==

http://git-wip-us.apache.org/repos/asf/spark/blob/6e1c55ea/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 623348f..b8a4302 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -97,22 +97,31 @@ private[sql] case class LogicalRDD(
 private[sql] case class PhysicalRDD(
     output: Seq[Attribute],
     rdd: RDD[InternalRow],
-    extraInformation: String,
+    override val nodeName: String,
+    override val metadata: Map[String, String] = Map.empty,
     override val outputsUnsafeRows: Boolean = false)
   extends LeafNode {
 
   protected override def doExecute(): RDD[InternalRow] = rdd
 
-  override def simpleString: String = "Scan " + extraInformation + output.mkString("[", ",", "]")
+  override def simpleString: String = {
+    val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value"
+    s"Scan $nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}"
+  }
 }
 
 private[sql] object PhysicalRDD {
+  // Metadata keys
+  val INPUT_PATHS = "InputPaths"
+  val PUSHED_FILTERS = "PushedFilters"
+
   def createFromDataSource(
       output: Seq[Attribute],
       rdd: RDD[InternalRow],
       relation: BaseRelation,
-      extraInformation: String = ""): PhysicalRDD = {
-    PhysicalRDD(output, rdd, relation.toString + extraInformation,
-      relation.isInstanceOf[HadoopFsRelation])
+      metadata: Map[String, String] = Map.empty): PhysicalRDD = {
+    // All HadoopFsRelations output UnsafeRows
+    val outputUnsafeRows = relation.isInstanceOf[HadoopFsRelation]
+    PhysicalRDD(output, rdd, relation.toString, metadata, outputUnsafeRows)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6e1c55ea/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index a781777..ec98f81 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -68,6 +68,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
   }
 
   /**
+   * Return all metadata that describes more details of this SparkPlan.
+   */
+  private[sql] def metadata: Map[String, String] = Map.empty
+
+  /**
    * Return all metrics containing metrics of this SparkPlan.
    */
   private[sql] def metrics: Map[String, SQLMetric[_, _]] = Map.empty

http://git-wip-us.apache.org/repos/asf/spark/blob/6e1c55ea/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
index 486ce34..4f750ad 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
@@ -30,6 +30,7 @@ class SparkPlanInfo(
     val nodeName: String,
     val simpleString: String,
     val children: Seq[SparkPlanInfo],
+    val metadata: Map[String, String],
     val metrics: Seq[SQLMetricInfo])
 
 private[sql] object SparkPlanInfo {
@@ -41,6 +42,6 @@ private[sql] object SparkPlanInfo {
     }
     val children = plan.children.map(fromSparkPlan)
 
-    new SparkPlanInfo(plan.nodeName, plan.simpleString, children, metrics)
+    new SparkPlanInfo(plan.nodeName, plan.simpleString, children, plan.metadata, metrics)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6e1c55ea/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index f67c951..25e98c0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -363,7 +363,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
           expressions, nPartitions.getOrElse(numPartitions)), planLater(child)) :: Nil
       case e @ EvaluatePython(udf, child, _) =>
         BatchPythonEvaluation(udf, e.output, planLater(child)) :: Nil
-      case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd, "PhysicalRDD") :: Nil
+      case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd, "ExistingRDD") :: Nil
       case BroadcastHint(child) => apply(child)
       case _ => Nil
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/6e1c55ea/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 544d5ec..8a15a51 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import scala.collection.mutable.ArrayBuffer
+
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.{MapPartitionsRDD, RDD, UnionRDD}
 import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
@@ -25,6 +27,7 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, expressions}
+import org.apache.spark.sql.execution.PhysicalRDD.{INPUT_PATHS, PUSHED_FILTERS}
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{StringType, StructType}
@@ -315,7 +318,20 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
     // `Filter`s or cannot be handled by `relation`.
     val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And)
 
-    val pushedFiltersString = pushedFilters.mkString(" PushedFilter: [", ",", "] ")
+    val metadata: Map[String, String] = {
+      val pairs = ArrayBuffer.empty[(String, String)]
+
+      if (pushedFilters.nonEmpty) {
+        pairs += (PUSHED_FILTERS -> pushedFilters.mkString("[", ", ", "]"))
+      }
+
+      relation.relation match {
+        case r: HadoopFsRelation => pairs += INPUT_PATHS -> r.paths.mkString(", ")
+        case _ =>
+      }
+
+      pairs.toMap
+    }
 
     if (projects.map(_.toAttribute) == projects &&
         projectSet.size == projects.size &&
@@ -334,7 +350,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
       val scan = execution.PhysicalRDD.createFromDataSource(
         projects.map(_.toAttribute),
         scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
-        relation.relation, pushedFiltersString)
+        relation.relation, metadata)
       filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)
     } else {
       // Don't request columns that are only referenced by pushed filters.
@@ -344,7 +360,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
       val scan = execution.PhysicalRDD.createFromDataSource(
         requestedColumns,
         scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
-        relation.relation, pushedFiltersString)
+        relation.relation, metadata)
       execution.Project(
         projects, filterCondition.map(execution.Filter(_, scan)).getOrElse(scan))
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/6e1c55ea/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index bb3e278..1af2a39 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -146,6 +146,12 @@ private[sql] class ParquetRelation(
     meta
   }
 
+  override def toString: String = {
+    parameters.get(ParquetRelation.METASTORE_TABLE_NAME).map { tableName =>
+      s"${getClass.getSimpleName}: $tableName"
+    }.getOrElse(super.toString)
+  }
+
   override def equals(other: Any): Boolean = other match {
     case that: ParquetRelation =>
       val schemaEquality = if (shouldMergeSchemas) {
@@ -521,6 +527,10 @@ private[sql] object ParquetRelation extends Logging {
   // internally.
   private[sql] val METASTORE_SCHEMA = "metastoreSchema"
 
+  // If a ParquetRelation is converted from a Hive metastore table, this option is set to the
+  // original Hive table name.
+  private[sql] val METASTORE_TABLE_NAME = "metastoreTableName"
+
   /**
    * If parquet's block size (row group size) setting is larger than the min split size,
    * we use parquet's block size setting as the min split size. Otherwise, we will create

http://git-wip-us.apache.org/repos/asf/spark/blob/6e1c55ea/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
index 7af0ff0..3a6eff9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
@@ -66,7 +66,9 @@ private[sql] object SparkPlanGraph {
         SQLMetrics.getMetricParam(metric.metricParam))
     }
     val node = SparkPlanGraphNode(
-      nodeIdGenerator.getAndIncrement(), planInfo.nodeName, planInfo.simpleString, metrics)
+      nodeIdGenerator.getAndIncrement(), planInfo.nodeName,
+      planInfo.simpleString, planInfo.metadata, metrics)
+
     nodes += node
     val childrenNodes = planInfo.children.map(
       child => buildSparkPlanGraphNode(child, nodeIdGenerator, nodes, edges))
@@ -85,26 +87,33 @@ private[sql] object SparkPlanGraph {
  * @param metrics metrics that this SparkPlan node will track
  */
 private[ui] case class SparkPlanGraphNode(
-    id: Long, name: String, desc: String, metrics: Seq[SQLPlanMetric]) {
+    id: Long,
+    name: String,
+    desc: String,
+    metadata: Map[String, String],
+    metrics: Seq[SQLPlanMetric]) {
 
   def makeDotNode(metricsValue: Map[Long, String]): String = {
-    val values = {
-      for (metric <- metrics;
-           value <- metricsValue.get(metric.accumulatorId)) yield {
-        metric.name + ": " + value
-      }
+    val builder = new mutable.StringBuilder(name)
+
+    val values = for {
+      metric <- metrics
+      value <- metricsValue.get(metric.accumulatorId)
+    } yield {
+      metric.name + ": " + value
     }
-    val label = if (values.isEmpty) {
-        name
-      } else {
-        // If there are metrics, display all metrics in a separate line. We should use an escaped
-        // "\n" here to follow the dot syntax.
-        //
-        // Note: whitespace between two "\n"s is to create an empty line between the name of
-        // SparkPlan and metrics. If removing it, it won't display the empty line in UI.
-        name + "\\n \\n" + values.mkString("\\n")
-      }
-    s"""  $id [label="$label"];"""
+
+    if (values.nonEmpty) {
+      // If there are metrics, display each entry in a separate line. We should use an escaped
+      // "\n" here to follow the dot syntax.
+      //
+      // Note: whitespace between two "\n"s is to create an empty line between the name of
+      // SparkPlan and metrics. If removing it, it won't display the empty line in UI.
+      builder ++= "\\n \\n"
+      builder ++= values.mkString("\\n")
+    }
+
+    s"""  $id [label="${builder.toString()}"];"""
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6e1c55ea/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 9ace25d..fc8ce69 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -422,7 +422,7 @@ abstract class HadoopFsRelation private[sql](
     parameters: Map[String, String])
   extends BaseRelation with FileRelation with Logging {
 
-  override def toString: String = getClass.getSimpleName + paths.mkString("[", ",", "]")
+  override def toString: String = getClass.getSimpleName
 
   def this() = this(None, Map.empty[String, String])
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6e1c55ea/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index a462625..2fb439f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -169,7 +169,7 @@ class PlannerSuite extends SharedSQLContext {
 
       withTempTable("testPushed") {
         val exp = sql("select * from testPushed where key = 15").queryExecution.executedPlan
-        assert(exp.toString.contains("PushedFilter: [EqualTo(key,15)]"))
+        assert(exp.toString.contains("PushedFilters: [EqualTo(key,15)]"))
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/6e1c55ea/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 9a981d0..08b291e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -411,7 +411,12 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
     // evil case insensitivity issue, which is reconciled within `ParquetRelation`.
     val parquetOptions = Map(
       ParquetRelation.METASTORE_SCHEMA -> metastoreSchema.json,
-      ParquetRelation.MERGE_SCHEMA -> mergeSchema.toString)
+      ParquetRelation.MERGE_SCHEMA -> mergeSchema.toString,
+      ParquetRelation.METASTORE_TABLE_NAME -> TableIdentifier(
+        metastoreRelation.tableName,
+        Some(metastoreRelation.databaseName)
+      ).unquotedString
+    )
     val tableIdentifier =
       QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org