You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/06/23 12:44:34 UTC

spark git commit: [SPARK-21165] [SQL] [2.2] Use executedPlan instead of analyzedPlan in INSERT AS SELECT [WIP]

Repository: spark
Updated Branches:
  refs/heads/branch-2.2 b99c0e9d1 -> b6749ba09


[SPARK-21165] [SQL] [2.2] Use executedPlan instead of analyzedPlan in INSERT AS SELECT [WIP]

### What changes were proposed in this pull request?

The input query schema of INSERT AS SELECT could be changed after optimization. For example, the following query's output schema is changed by the rule `SimplifyCasts` and `RemoveRedundantAliases`.
```SQL
 SELECT word, length, cast(first as string) as first FROM view1
```

This PR is to fix the issue in Spark 2.2. Instead of using the analyzed plan of the input query, this PR use its executed plan to determine the attributes in `FileFormatWriter`.

The related issue in the master branch has been fixed by https://github.com/apache/spark/pull/18064. After this PR is merged, I will submit a separate PR to merge the test case to the master.

### How was this patch tested?
Added a test case

Author: Xiao Li <ga...@gmail.com>
Author: gatorsmile <ga...@gmail.com>

Closes #18386 from gatorsmile/newRC5.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b6749ba0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b6749ba0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b6749ba0

Branch: refs/heads/branch-2.2
Commit: b6749ba09724b3ed19166e7bb0b1fdcca79a44ba
Parents: b99c0e9
Author: Xiao Li <ga...@gmail.com>
Authored: Fri Jun 23 20:44:25 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Jun 23 20:44:25 2017 +0800

----------------------------------------------------------------------
 .../sql/execution/datasources/DataSource.scala  | 12 +----------
 .../datasources/DataSourceStrategy.scala        |  4 +---
 .../datasources/FileFormatWriter.scala          | 15 ++++++++++---
 .../InsertIntoHadoopFsRelationCommand.scala     | 10 ++++-----
 .../spark/sql/execution/datasources/rules.scala | 16 +++++---------
 .../execution/streaming/FileStreamSink.scala    | 11 +---------
 .../hive/execution/InsertIntoHiveTable.scala    |  9 +-------
 .../sql/hive/InsertIntoHiveTableSuite.scala     | 22 ++++++++++++++++++++
 8 files changed, 48 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b6749ba0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 14c4060..4ffe215 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -408,16 +408,6 @@ case class DataSource(
     val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
     PartitioningUtils.validatePartitionColumn(data.schema, partitionColumns, caseSensitive)
 
-    // SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does
-    // not need to have the query as child, to avoid to analyze an optimized query,
-    // because InsertIntoHadoopFsRelationCommand will be optimized first.
-    val partitionAttributes = partitionColumns.map { name =>
-      val plan = data.logicalPlan
-      plan.resolve(name :: Nil, data.sparkSession.sessionState.analyzer.resolver).getOrElse {
-        throw new AnalysisException(
-          s"Unable to resolve $name given [${plan.output.map(_.name).mkString(", ")}]")
-      }.asInstanceOf[Attribute]
-    }
     val fileIndex = catalogTable.map(_.identifier).map { tableIdent =>
       sparkSession.table(tableIdent).queryExecution.analyzed.collect {
         case LogicalRelation(t: HadoopFsRelation, _, _) => t.location
@@ -431,7 +421,7 @@ case class DataSource(
         outputPath = outputPath,
         staticPartitions = Map.empty,
         ifPartitionNotExists = false,
-        partitionColumns = partitionAttributes,
+        partitionColumns = partitionColumns,
         bucketSpec = bucketSpec,
         fileFormat = format,
         options = options,

http://git-wip-us.apache.org/repos/asf/spark/blob/b6749ba0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index e05a8d5..ded9303 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -188,15 +188,13 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
           "Cannot overwrite a path that is also being read from.")
       }
 
-      val partitionSchema = actualQuery.resolve(
-        t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver)
       val staticPartitions = parts.filter(_._2.nonEmpty).map { case (k, v) => k -> v.get }
 
       InsertIntoHadoopFsRelationCommand(
         outputPath,
         staticPartitions,
         i.ifPartitionNotExists,
-        partitionSchema,
+        partitionColumns = t.partitionSchema.map(_.name),
         t.bucketSpec,
         t.fileFormat,
         t.options,

http://git-wip-us.apache.org/repos/asf/spark/blob/b6749ba0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 4ec09bf..2c31d2a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -101,7 +101,7 @@ object FileFormatWriter extends Logging {
       committer: FileCommitProtocol,
       outputSpec: OutputSpec,
       hadoopConf: Configuration,
-      partitionColumns: Seq[Attribute],
+      partitionColumnNames: Seq[String],
       bucketSpec: Option[BucketSpec],
       refreshFunction: (Seq[TablePartitionSpec]) => Unit,
       options: Map[String, String]): Unit = {
@@ -111,9 +111,18 @@ object FileFormatWriter extends Logging {
     job.setOutputValueClass(classOf[InternalRow])
     FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath))
 
-    val allColumns = queryExecution.logical.output
+    val allColumns = queryExecution.executedPlan.output
+    // Get the actual partition columns as attributes after matching them by name with
+    // the given columns names.
+    val partitionColumns = partitionColumnNames.map { col =>
+      val nameEquality = sparkSession.sessionState.conf.resolver
+      allColumns.find(f => nameEquality(f.name, col)).getOrElse {
+        throw new RuntimeException(
+          s"Partition column $col not found in schema ${queryExecution.executedPlan.schema}")
+      }
+    }
     val partitionSet = AttributeSet(partitionColumns)
-    val dataColumns = queryExecution.logical.output.filterNot(partitionSet.contains)
+    val dataColumns = allColumns.filterNot(partitionSet.contains)
 
     val bucketIdExpression = bucketSpec.map { spec =>
       val bucketColumns = spec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get)

http://git-wip-us.apache.org/repos/asf/spark/blob/b6749ba0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index c9d3144..ab35fdc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -44,7 +44,7 @@ case class InsertIntoHadoopFsRelationCommand(
     outputPath: Path,
     staticPartitions: TablePartitionSpec,
     ifPartitionNotExists: Boolean,
-    partitionColumns: Seq[Attribute],
+    partitionColumns: Seq[String],
     bucketSpec: Option[BucketSpec],
     fileFormat: FileFormat,
     options: Map[String, String],
@@ -150,7 +150,7 @@ case class InsertIntoHadoopFsRelationCommand(
         outputSpec = FileFormatWriter.OutputSpec(
           qualifiedOutputPath.toString, customPartitionLocations),
         hadoopConf = hadoopConf,
-        partitionColumns = partitionColumns,
+        partitionColumnNames = partitionColumns,
         bucketSpec = bucketSpec,
         refreshFunction = refreshPartitionsCallback,
         options = options)
@@ -176,10 +176,10 @@ case class InsertIntoHadoopFsRelationCommand(
       customPartitionLocations: Map[TablePartitionSpec, String],
       committer: FileCommitProtocol): Unit = {
     val staticPartitionPrefix = if (staticPartitions.nonEmpty) {
-      "/" + partitionColumns.flatMap { p =>
-        staticPartitions.get(p.name) match {
+      "/" + partitionColumns.flatMap { col =>
+        staticPartitions.get(col) match {
           case Some(value) =>
-            Some(escapePathName(p.name) + "=" + escapePathName(value))
+            Some(escapePathName(col) + "=" + escapePathName(value))
           case None =>
             None
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/b6749ba0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 3f4a785..45f2a41 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -127,11 +127,11 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
       val resolver = sparkSession.sessionState.conf.resolver
       val tableCols = existingTable.schema.map(_.name)
 
-      // As we are inserting into an existing table, we should respect the existing schema and
-      // adjust the column order of the given dataframe according to it, or throw exception
-      // if the column names do not match.
+      // As we are inserting into an existing table, we should respect the existing schema, preserve
+      // the case and adjust the column order of the given DataFrame according to it, or throw
+      // an exception if the column names do not match.
       val adjustedColumns = tableCols.map { col =>
-        query.resolve(Seq(col), resolver).getOrElse {
+        query.resolve(Seq(col), resolver).map(Alias(_, col)()).getOrElse {
           val inputColumns = query.schema.map(_.name).mkString(", ")
           throw new AnalysisException(
             s"cannot resolve '$col' given input columns: [$inputColumns]")
@@ -168,15 +168,9 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
           """.stripMargin)
       }
 
-      val newQuery = if (adjustedColumns != query.output) {
-        Project(adjustedColumns, query)
-      } else {
-        query
-      }
-
       c.copy(
         tableDesc = existingTable,
-        query = Some(newQuery))
+        query = Some(Project(adjustedColumns, query)))
 
     // Here we normalize partition, bucket and sort column names, w.r.t. the case sensitivity
     // config, and do various checks:

http://git-wip-us.apache.org/repos/asf/spark/blob/b6749ba0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index 6885d0b..2a65292 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -111,15 +111,6 @@ class FileStreamSink(
         case _ =>  // Do nothing
       }
 
-      // Get the actual partition columns as attributes after matching them by name with
-      // the given columns names.
-      val partitionColumns: Seq[Attribute] = partitionColumnNames.map { col =>
-        val nameEquality = data.sparkSession.sessionState.conf.resolver
-        data.logicalPlan.output.find(f => nameEquality(f.name, col)).getOrElse {
-          throw new RuntimeException(s"Partition column $col not found in schema ${data.schema}")
-        }
-      }
-
       FileFormatWriter.write(
         sparkSession = sparkSession,
         queryExecution = data.queryExecution,
@@ -127,7 +118,7 @@ class FileStreamSink(
         committer = committer,
         outputSpec = FileFormatWriter.OutputSpec(path, Map.empty),
         hadoopConf = hadoopConf,
-        partitionColumns = partitionColumns,
+        partitionColumnNames = partitionColumnNames,
         bucketSpec = None,
         refreshFunction = _ => (),
         options = options)

http://git-wip-us.apache.org/repos/asf/spark/blob/b6749ba0/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 35f65e9..797481c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -314,13 +314,6 @@ case class InsertIntoHiveTable(
       outputPath = tmpLocation.toString,
       isAppend = false)
 
-    val partitionAttributes = partitionColumnNames.takeRight(numDynamicPartitions).map { name =>
-      query.resolve(name :: Nil, sparkSession.sessionState.analyzer.resolver).getOrElse {
-        throw new AnalysisException(
-          s"Unable to resolve $name given [${query.output.map(_.name).mkString(", ")}]")
-      }.asInstanceOf[Attribute]
-    }
-
     FileFormatWriter.write(
       sparkSession = sparkSession,
       queryExecution = Dataset.ofRows(sparkSession, query).queryExecution,
@@ -328,7 +321,7 @@ case class InsertIntoHiveTable(
       committer = committer,
       outputSpec = FileFormatWriter.OutputSpec(tmpLocation.toString, Map.empty),
       hadoopConf = hadoopConf,
-      partitionColumns = partitionAttributes,
+      partitionColumnNames = partitionColumnNames.takeRight(numDynamicPartitions),
       bucketSpec = None,
       refreshFunction = _ => (),
       options = Map.empty)

http://git-wip-us.apache.org/repos/asf/spark/blob/b6749ba0/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
index 58ab0c2..618e5b6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
@@ -468,6 +468,28 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
       }
   }
 
+  test("SPARK-21165: the query schema of INSERT is changed after optimization") {
+    withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
+      withTable("tab1", "tab2") {
+        Seq(("a", "b", 3)).toDF("word", "first", "length").write.saveAsTable("tab1")
+
+        spark.sql(
+          """
+            |CREATE TABLE tab2 (word string, length int)
+            |PARTITIONED BY (first string)
+          """.stripMargin)
+
+        spark.sql(
+          """
+            |INSERT INTO TABLE tab2 PARTITION(first)
+            |SELECT word, length, cast(first as string) as first FROM tab1
+          """.stripMargin)
+
+        checkAnswer(spark.table("tab2"), Row("a", 3, "b"))
+      }
+    }
+  }
+
   testPartitionedTable("insertInto() should reject extra columns") {
     tableName =>
       sql("CREATE TABLE t (a INT, b INT, c INT, d INT, e INT)")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org