You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2018/02/12 14:08:07 UTC

spark git commit: [SPARK-22977][SQL] fix web UI SQL tab for CTAS

Repository: spark
Updated Branches:
  refs/heads/master caeb108e2 -> 0e2c266de

[SPARK-22977][SQL] fix web UI SQL tab for CTAS

## What changes were proposed in this pull request?

This is a regression in Spark 2.3.

In Spark 2.2, we have a fragile UI support for SQL data writing commands. We only track the input query plan of `FileFormatWriter` and display its metrics. This is not ideal because we don't know who triggered the writing(can be table insertion, CTAS, etc.), but it's still useful to see the metrics of the input query.

In Spark 2.3, we introduced a new mechanism: `DataWritigCommand`, to fix the UI issue entirely. Now these writing commands have real children, and we don't need to hack into the `FileFormatWriter` for the UI. This also helps with `explain`, now `explain` can show the physical plan of the input query, while in 2.2 the physical writing plan is simply `ExecutedCommandExec` and it has no child.

However there is a regression in CTAS. CTAS commands don't extend `DataWritigCommand`, and we don't have the UI hack in `FileFormatWriter` anymore, so the UI for CTAS is just an empty node. See for more information about this UI issue.

To fix it, we should apply the `DataWritigCommand` mechanism to CTAS commands.

TODO: In the future, we should refactor this part and create some physical layer code pieces for data writing, and reuse them in different writing commands. We should have different logical nodes for different operators, even some of them share some same logic, e.g. CTAS, CREATE TABLE, INSERT TABLE. Internally we can share the same physical logic.

## How was this patch tested?

manually tested.
For data source table
<img width="644" alt="1" src="">
For hive table
<img width="666" alt="2" src="">

Author: Wenchen Fan <>

Closes #20521 from cloud-fan/UI.


Branch: refs/heads/master
Commit: 0e2c266de7189473177f45aa68ea6a45c7e47ec3
Parents: caeb108
Author: Wenchen Fan <>
Authored: Mon Feb 12 22:07:59 2018 +0800
Committer: Wenchen Fan <>
Committed: Mon Feb 12 22:07:59 2018 +0800

 .../command/createDataSourceTables.scala        | 21 ++++----
 .../sql/execution/datasources/DataSource.scala  | 44 +++++++++++++---
 .../datasources/DataSourceStrategy.scala        |  2 +-
 .../apache/spark/sql/hive/HiveStrategies.scala  |  2 +-
 .../CreateHiveTableAsSelectCommand.scala        | 55 +++++++++++---------
 .../sql/hive/execution/HiveExplainSuite.scala   | 26 ---------
 6 files changed, 80 insertions(+), 70 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 306f43d..e974776 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -21,7 +21,9 @@ import
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types.StructType
@@ -136,12 +138,11 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
 case class CreateDataSourceTableAsSelectCommand(
     table: CatalogTable,
     mode: SaveMode,
-    query: LogicalPlan)
-  extends RunnableCommand {
-  override protected def innerChildren: Seq[LogicalPlan] = Seq(query)
+    query: LogicalPlan,
+    outputColumns: Seq[Attribute])
+  extends DataWritingCommand {
-  override def run(sparkSession: SparkSession): Seq[Row] = {
+  override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
     assert(table.tableType != CatalogTableType.VIEW)
@@ -163,7 +164,7 @@ case class CreateDataSourceTableAsSelectCommand(
-        sparkSession, table,, query, SaveMode.Append, tableExists = true)
+        sparkSession, table,, child, SaveMode.Append, tableExists = true)
     } else {
@@ -173,7 +174,7 @@ case class CreateDataSourceTableAsSelectCommand(
       val result = saveDataIntoTable(
-        sparkSession, table, tableLocation, query, SaveMode.Overwrite, tableExists = false)
+        sparkSession, table, tableLocation, child, SaveMode.Overwrite, tableExists = false)
       val newTable = table.copy(
         storage = = tableLocation),
         // We will use the schema of resolved.relation as the schema of the table (instead of
@@ -198,10 +199,10 @@ case class CreateDataSourceTableAsSelectCommand(
       session: SparkSession,
       table: CatalogTable,
       tableLocation: Option[URI],
-      data: LogicalPlan,
+      physicalPlan: SparkPlan,
       mode: SaveMode,
       tableExists: Boolean): BaseRelation = {
-    // Create the relation based on the input logical plan: `data`.
+    // Create the relation based on the input logical plan: `query`.
     val pathOption ="path" -> CatalogUtils.URIToString(_))
     val dataSource = DataSource(
@@ -212,7 +213,7 @@ case class CreateDataSourceTableAsSelectCommand(
       catalogTable = if (tableExists) Some(table) else None)
     try {
-      dataSource.writeAndRead(mode, query)
+      dataSource.writeAndRead(mode, query, outputColumns, physicalPlan)
     } catch {
       case ex: AnalysisException =>
         logError(s"Failed to write to table ${table.identifier.unquotedString}", ex)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 25e1210..6e1b572 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -31,8 +31,10 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogUtils}
+import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
 import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
 import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
@@ -435,10 +437,11 @@ case class DataSource(
-   * Writes the given [[LogicalPlan]] out in this [[FileFormat]].
+   * Creates a command node to write the given [[LogicalPlan]] out to the given [[FileFormat]].
+   * The returned command is unresolved and need to be analyzed.
   private def planForWritingFileFormat(
-      format: FileFormat, mode: SaveMode, data: LogicalPlan): LogicalPlan = {
+      format: FileFormat, mode: SaveMode, data: LogicalPlan): InsertIntoHadoopFsRelationCommand = {
     // Don't glob path for the write path.  The contracts here are:
     //  1. Only one output path can be specified on the write path;
     //  2. Output path must be a legal HDFS style file system path;
@@ -482,9 +485,24 @@ case class DataSource(
    * Writes the given [[LogicalPlan]] out to this [[DataSource]] and returns a [[BaseRelation]] for
    * the following reading.
+   *
+   * @param mode The save mode for this writing.
+   * @param data The input query plan that produces the data to be written. Note that this plan
+   *             is analyzed and optimized.
+   * @param outputColumns The original output columns of the input query plan. The optimizer may not
+   *                      preserve the output column's names' case, so we need this parameter
+   *                      instead of `data.output`.
+   * @param physicalPlan The physical plan of the input query plan. We should run the writing
+   *                     command with this physical plan instead of creating a new physical plan,
+   *                     so that the metrics can be correctly linked to the given physical plan and
+   *                     shown in the web UI.
-  def writeAndRead(mode: SaveMode, data: LogicalPlan): BaseRelation = {
-    if ([CalendarIntervalType])) {
+  def writeAndRead(
+      mode: SaveMode,
+      data: LogicalPlan,
+      outputColumns: Seq[Attribute],
+      physicalPlan: SparkPlan): BaseRelation = {
+    if ([CalendarIntervalType])) {
       throw new AnalysisException("Cannot save interval data type into external storage.")
@@ -493,9 +511,23 @@ case class DataSource(
           sparkSession.sqlContext, mode, caseInsensitiveOptions, Dataset.ofRows(sparkSession, data))
       case format: FileFormat =>
-        sparkSession.sessionState.executePlan(planForWritingFileFormat(format, mode, data)).toRdd
+        val cmd = planForWritingFileFormat(format, mode, data)
+        val resolvedPartCols = { col =>
+          // The partition columns created in `planForWritingFileFormat` should always be
+          // `UnresolvedAttribute` with a single name part.
+          assert(col.isInstanceOf[UnresolvedAttribute])
+          val unresolved = col.asInstanceOf[UnresolvedAttribute]
+          assert(unresolved.nameParts.length == 1)
+          val name = unresolved.nameParts.head
+          outputColumns.find(a => equality(, name)).getOrElse {
+            throw new AnalysisException(
+              s"Unable to resolve $name given [${", ")}]")
+          }
+        }
+        val resolved = cmd.copy(partitionColumns = resolvedPartCols, outputColumns = outputColumns)
+, physicalPlan)
         // Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring
-        copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation()
+        copy(userSpecifiedSchema = Some(outputColumns.toStructType.asNullable)).resolveRelation()
       case _ =>
         sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index d94c5bb..3f41612 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -139,7 +139,7 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
     case CreateTable(tableDesc, mode, Some(query))
         if query.resolved && DDLUtils.isDatasourceTable(tableDesc) =>
       DDLUtils.checkDataColNames(tableDesc.copy(schema = query.schema))
-      CreateDataSourceTableAsSelectCommand(tableDesc, mode, query)
+      CreateDataSourceTableAsSelectCommand(tableDesc, mode, query, query.output)
     case InsertIntoTable(l @ LogicalRelation(_: InsertableRelation, _, _, _),
         parts, query, overwrite, false) if parts.isEmpty =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index ab857b9..8df05cb 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -157,7 +157,7 @@ object HiveAnalysis extends Rule[LogicalPlan] {
     case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) =>
-      CreateHiveTableAsSelectCommand(tableDesc, query, mode)
+      CreateHiveTableAsSelectCommand(tableDesc, query, query.output, mode)
     case InsertIntoDir(isLocal, storage, provider, child, overwrite)
         if DDLUtils.isHiveTable(provider) =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index 65e8b4e..1e801fe 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -20,10 +20,11 @@ package org.apache.spark.sql.hive.execution
 import scala.util.control.NonFatal
 import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
-import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.command.DataWritingCommand
@@ -36,15 +37,15 @@ import org.apache.spark.sql.execution.command.RunnableCommand
 case class CreateHiveTableAsSelectCommand(
     tableDesc: CatalogTable,
     query: LogicalPlan,
+    outputColumns: Seq[Attribute],
     mode: SaveMode)
-  extends RunnableCommand {
+  extends DataWritingCommand {
   private val tableIdentifier = tableDesc.identifier
-  override def innerChildren: Seq[LogicalPlan] = Seq(query)
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    if (sparkSession.sessionState.catalog.tableExists(tableIdentifier)) {
+  override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
+    val catalog = sparkSession.sessionState.catalog
+    if (catalog.tableExists(tableIdentifier)) {
       assert(mode != SaveMode.Overwrite,
         s"Expect the table $tableIdentifier has been dropped when the save mode is Overwrite")
@@ -56,34 +57,36 @@ case class CreateHiveTableAsSelectCommand(
         return Seq.empty
-      sparkSession.sessionState.executePlan(
-        InsertIntoTable(
-          UnresolvedRelation(tableIdentifier),
-          Map(),
-          query,
-          overwrite = false,
-          ifPartitionNotExists = false)).toRdd
+      InsertIntoHiveTable(
+        tableDesc,
+        Map.empty,
+        query,
+        overwrite = false,
+        ifPartitionNotExists = false,
+        outputColumns = outputColumns).run(sparkSession, child)
     } else {
       // TODO ideally, we should get the output data ready first and then
       // add the relation into catalog, just in case of failure occurs while data
       // processing.
-      sparkSession.sessionState.catalog.createTable(
-        tableDesc.copy(schema = query.schema), ignoreIfExists = false)
+      catalog.createTable(tableDesc.copy(schema = query.schema), ignoreIfExists = false)
       try {
-        sparkSession.sessionState.executePlan(
-          InsertIntoTable(
-            UnresolvedRelation(tableIdentifier),
-            Map(),
-            query,
-            overwrite = true,
-            ifPartitionNotExists = false)).toRdd
+        // Read back the metadata of the table which was created just now.
+        val createdTableMeta = catalog.getTableMetadata(tableDesc.identifier)
+        // For CTAS, there is no static partition values to insert.
+        val partition = -> None).toMap
+        InsertIntoHiveTable(
+          createdTableMeta,
+          partition,
+          query,
+          overwrite = true,
+          ifPartitionNotExists = false,
+          outputColumns = outputColumns).run(sparkSession, child)
       } catch {
         case NonFatal(e) =>
           // drop the created table.
-          sparkSession.sessionState.catalog.dropTable(tableIdentifier, ignoreIfNotExists = true,
-            purge = false)
+          catalog.dropTable(tableIdentifier, ignoreIfNotExists = true, purge = false)
           throw e
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
index f84d188..5d56f89 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
@@ -128,32 +128,6 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
-  test("SPARK-17409: The EXPLAIN output of CTAS only shows the analyzed plan") {
-    withTempView("jt") {
-      val ds = (1 to 10).map(i => s"""{"a":$i, "b":"str$i"}""").toDS()
-      val outputs = sql(
-        s"""
-           |CREATE TABLE t1
-           |AS
-           |SELECT * FROM jt
-         """.stripMargin).collect().map(_.mkString).mkString
-      val shouldContain =
-        "== Parsed Logical Plan ==" :: "== Analyzed Logical Plan ==" :: "Subquery" ::
-        "== Optimized Logical Plan ==" :: "== Physical Plan ==" ::
-        "CreateHiveTableAsSelect" :: "InsertIntoHiveTable" :: "jt" :: Nil
-      for (key <- shouldContain) {
-        assert(outputs.contains(key), s"$key doesn't exist in result")
-      }
-      val physicalIndex = outputs.indexOf("== Physical Plan ==")
-      assert(outputs.substring(physicalIndex).contains("Subquery"),
-        "Physical Plan should contain SubqueryAlias since the query should not be optimized")
-    }
-  }
   test("explain output of physical plan should contain proper codegen stage ID") {

To unsubscribe, e-mail:
For additional commands, e-mail: