You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2015/12/10 19:20:51 UTC
spark git commit: [SPARK-12012][SQL][BRANCH-1.6] Show more
comprehensive PhysicalRDD metadata when visualizing SQL query plan
Repository: spark
Updated Branches:
refs/heads/branch-1.6 93ef24638 -> e541f703d
[SPARK-12012][SQL][BRANCH-1.6] Show more comprehensive PhysicalRDD metadata when visualizing SQL query plan
This PR backports PR #10004 to branch-1.6
It adds a private[sql] method metadata to SparkPlan, which can be used to describe detail information about a physical plan during visualization. Specifically, this PR uses this method to provide details of PhysicalRDDs translated from a data source relation.
Author: Cheng Lian <li...@databricks.com>
Closes #10250 from liancheng/spark-12012.for-1.6.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e541f703
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e541f703
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e541f703
Branch: refs/heads/branch-1.6
Commit: e541f703d72d3dd3ad96db55650c5b1a1a5a38e2
Parents: 93ef246
Author: Cheng Lian <li...@databricks.com>
Authored: Thu Dec 10 10:19:44 2015 -0800
Committer: Yin Huai <yh...@databricks.com>
Committed: Thu Dec 10 10:19:49 2015 -0800
----------------------------------------------------------------------
python/pyspark/sql/dataframe.py | 2 +-
.../spark/sql/execution/ExistingRDD.scala | 19 ++++++---
.../apache/spark/sql/execution/SparkPlan.scala | 5 +++
.../spark/sql/execution/SparkStrategies.scala | 2 +-
.../datasources/DataSourceStrategy.scala | 22 ++++++++--
.../datasources/parquet/ParquetRelation.scala | 10 +++++
.../spark/sql/execution/ui/SparkPlanGraph.scala | 43 ++++++++++++--------
.../apache/spark/sql/sources/interfaces.scala | 2 +-
.../spark/sql/execution/PlannerSuite.scala | 2 +-
.../spark/sql/hive/HiveMetastoreCatalog.scala | 7 +++-
10 files changed, 83 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/e541f703/python/pyspark/sql/dataframe.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 746bb55..78ab475 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -213,7 +213,7 @@ class DataFrame(object):
>>> df.explain()
== Physical Plan ==
- Scan PhysicalRDD[age#0,name#1]
+ Scan ExistingRDD[age#0,name#1]
>>> df.explain(True)
== Parsed Logical Plan ==
http://git-wip-us.apache.org/repos/asf/spark/blob/e541f703/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 623348f..b8a4302 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -97,22 +97,31 @@ private[sql] case class LogicalRDD(
private[sql] case class PhysicalRDD(
output: Seq[Attribute],
rdd: RDD[InternalRow],
- extraInformation: String,
+ override val nodeName: String,
+ override val metadata: Map[String, String] = Map.empty,
override val outputsUnsafeRows: Boolean = false)
extends LeafNode {
protected override def doExecute(): RDD[InternalRow] = rdd
- override def simpleString: String = "Scan " + extraInformation + output.mkString("[", ",", "]")
+ override def simpleString: String = {
+ val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value"
+ s"Scan $nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}"
+ }
}
private[sql] object PhysicalRDD {
+ // Metadata keys
+ val INPUT_PATHS = "InputPaths"
+ val PUSHED_FILTERS = "PushedFilters"
+
def createFromDataSource(
output: Seq[Attribute],
rdd: RDD[InternalRow],
relation: BaseRelation,
- extraInformation: String = ""): PhysicalRDD = {
- PhysicalRDD(output, rdd, relation.toString + extraInformation,
- relation.isInstanceOf[HadoopFsRelation])
+ metadata: Map[String, String] = Map.empty): PhysicalRDD = {
+ // All HadoopFsRelations output UnsafeRows
+ val outputUnsafeRows = relation.isInstanceOf[HadoopFsRelation]
+ PhysicalRDD(output, rdd, relation.toString, metadata, outputUnsafeRows)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/e541f703/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index a781777..ec98f81 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -68,6 +68,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
}
/**
+ * Return all metadata that describes more details of this SparkPlan.
+ */
+ private[sql] def metadata: Map[String, String] = Map.empty
+
+ /**
* Return all metrics containing metrics of this SparkPlan.
*/
private[sql] def metrics: Map[String, SQLMetric[_, _]] = Map.empty
http://git-wip-us.apache.org/repos/asf/spark/blob/e541f703/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index f67c951..25e98c0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -363,7 +363,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
expressions, nPartitions.getOrElse(numPartitions)), planLater(child)) :: Nil
case e @ EvaluatePython(udf, child, _) =>
BatchPythonEvaluation(udf, e.output, planLater(child)) :: Nil
- case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd, "PhysicalRDD") :: Nil
+ case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd, "ExistingRDD") :: Nil
case BroadcastHint(child) => apply(child)
case _ => Nil
}
http://git-wip-us.apache.org/repos/asf/spark/blob/e541f703/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 544d5ec..8a15a51 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.datasources
+import scala.collection.mutable.ArrayBuffer
+
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.{MapPartitionsRDD, RDD, UnionRDD}
import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
@@ -25,6 +27,7 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, expressions}
+import org.apache.spark.sql.execution.PhysicalRDD.{INPUT_PATHS, PUSHED_FILTERS}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{StringType, StructType}
@@ -315,7 +318,20 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// `Filter`s or cannot be handled by `relation`.
val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And)
- val pushedFiltersString = pushedFilters.mkString(" PushedFilter: [", ",", "] ")
+ val metadata: Map[String, String] = {
+ val pairs = ArrayBuffer.empty[(String, String)]
+
+ if (pushedFilters.nonEmpty) {
+ pairs += (PUSHED_FILTERS -> pushedFilters.mkString("[", ", ", "]"))
+ }
+
+ relation.relation match {
+ case r: HadoopFsRelation => pairs += INPUT_PATHS -> r.paths.mkString(", ")
+ case _ =>
+ }
+
+ pairs.toMap
+ }
if (projects.map(_.toAttribute) == projects &&
projectSet.size == projects.size &&
@@ -334,7 +350,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
val scan = execution.PhysicalRDD.createFromDataSource(
projects.map(_.toAttribute),
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
- relation.relation, pushedFiltersString)
+ relation.relation, metadata)
filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)
} else {
// Don't request columns that are only referenced by pushed filters.
@@ -344,7 +360,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
val scan = execution.PhysicalRDD.createFromDataSource(
requestedColumns,
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
- relation.relation, pushedFiltersString)
+ relation.relation, metadata)
execution.Project(
projects, filterCondition.map(execution.Filter(_, scan)).getOrElse(scan))
}
http://git-wip-us.apache.org/repos/asf/spark/blob/e541f703/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index fdd745f..4f1685a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -146,6 +146,12 @@ private[sql] class ParquetRelation(
meta
}
+ override def toString: String = {
+ parameters.get(ParquetRelation.METASTORE_TABLE_NAME).map { tableName =>
+ s"${getClass.getSimpleName}: $tableName"
+ }.getOrElse(super.toString)
+ }
+
override def equals(other: Any): Boolean = other match {
case that: ParquetRelation =>
val schemaEquality = if (shouldMergeSchemas) {
@@ -521,6 +527,10 @@ private[sql] object ParquetRelation extends Logging {
// internally.
private[sql] val METASTORE_SCHEMA = "metastoreSchema"
+ // If a ParquetRelation is converted from a Hive metastore table, this option is set to the
+ // original Hive table name.
+ private[sql] val METASTORE_TABLE_NAME = "metastoreTableName"
+
/**
* If parquet's block size (row group size) setting is larger than the min split size,
* we use parquet's block size setting as the min split size. Otherwise, we will create
http://git-wip-us.apache.org/repos/asf/spark/blob/e541f703/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
index f1fce54..7a8002c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
@@ -66,7 +66,7 @@ private[sql] object SparkPlanGraph {
metric.param.asInstanceOf[SQLMetricParam[SQLMetricValue[Any], Any]])
}
val node = SparkPlanGraphNode(
- nodeIdGenerator.getAndIncrement(), plan.nodeName, plan.simpleString, metrics)
+ nodeIdGenerator.getAndIncrement(), plan.nodeName, plan.simpleString, plan.metadata, metrics)
nodes += node
val childrenNodes = plan.children.map(
child => buildSparkPlanGraphNode(child, nodeIdGenerator, nodes, edges))
@@ -85,26 +85,33 @@ private[sql] object SparkPlanGraph {
* @param metrics metrics that this SparkPlan node will track
*/
private[ui] case class SparkPlanGraphNode(
- id: Long, name: String, desc: String, metrics: Seq[SQLPlanMetric]) {
+ id: Long,
+ name: String,
+ desc: String,
+ metadata: Map[String, String],
+ metrics: Seq[SQLPlanMetric]) {
def makeDotNode(metricsValue: Map[Long, String]): String = {
- val values = {
- for (metric <- metrics;
- value <- metricsValue.get(metric.accumulatorId)) yield {
- metric.name + ": " + value
- }
+ val builder = new mutable.StringBuilder(name)
+
+ val values = for {
+ metric <- metrics
+ value <- metricsValue.get(metric.accumulatorId)
+ } yield {
+ metric.name + ": " + value
+ }
+
+ if (values.nonEmpty) {
+ // If there are metrics, display each entry in a separate line. We should use an escaped
+ // "\n" here to follow the dot syntax.
+ //
+ // Note: whitespace between two "\n"s is to create an empty line between the name of
+ // SparkPlan and metrics. If removing it, it won't display the empty line in UI.
+ builder ++= "\\n \\n"
+ builder ++= values.mkString("\\n")
}
- val label = if (values.isEmpty) {
- name
- } else {
- // If there are metrics, display all metrics in a separate line. We should use an escaped
- // "\n" here to follow the dot syntax.
- //
- // Note: whitespace between two "\n"s is to create an empty line between the name of
- // SparkPlan and metrics. If removing it, it won't display the empty line in UI.
- name + "\\n \\n" + values.mkString("\\n")
- }
- s""" $id [label="$label"];"""
+
+ s""" $id [label="${builder.toString()}"];"""
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/e541f703/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 9ace25d..fc8ce69 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -422,7 +422,7 @@ abstract class HadoopFsRelation private[sql](
parameters: Map[String, String])
extends BaseRelation with FileRelation with Logging {
- override def toString: String = getClass.getSimpleName + paths.mkString("[", ",", "]")
+ override def toString: String = getClass.getSimpleName
def this() = this(None, Map.empty[String, String])
http://git-wip-us.apache.org/repos/asf/spark/blob/e541f703/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index a462625..2fb439f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -169,7 +169,7 @@ class PlannerSuite extends SharedSQLContext {
withTempTable("testPushed") {
val exp = sql("select * from testPushed where key = 15").queryExecution.executedPlan
- assert(exp.toString.contains("PushedFilter: [EqualTo(key,15)]"))
+ assert(exp.toString.contains("PushedFilters: [EqualTo(key,15)]"))
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/e541f703/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 9a981d0..08b291e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -411,7 +411,12 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
// evil case insensitivity issue, which is reconciled within `ParquetRelation`.
val parquetOptions = Map(
ParquetRelation.METASTORE_SCHEMA -> metastoreSchema.json,
- ParquetRelation.MERGE_SCHEMA -> mergeSchema.toString)
+ ParquetRelation.MERGE_SCHEMA -> mergeSchema.toString,
+ ParquetRelation.METASTORE_TABLE_NAME -> TableIdentifier(
+ metastoreRelation.tableName,
+ Some(metastoreRelation.databaseName)
+ ).unquotedString
+ )
val tableIdentifier =
QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org