You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/10/13 04:54:04 UTC

spark git commit: [SPARK-22252][SQL][2.2] FileFormatWriter should respect the input query schema

Repository: spark
Updated Branches:
  refs/heads/branch-2.2 cfc04e062 -> c9187db80


[SPARK-22252][SQL][2.2] FileFormatWriter should respect the input query schema

## What changes were proposed in this pull request?

https://github.com/apache/spark/pull/18386 fixes SPARK-21165 but breaks SPARK-22252. This PR reverts https://github.com/apache/spark/pull/18386 and picks the patch from https://github.com/apache/spark/pull/19483 to fix SPARK-21165.

## How was this patch tested?

new regression test

Author: Wenchen Fan <we...@databricks.com>

Closes #19484 from cloud-fan/bug.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c9187db8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c9187db8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c9187db8

Branch: refs/heads/branch-2.2
Commit: c9187db80d424dec29550c4f8e9441a2dcb30a10
Parents: cfc04e0
Author: Wenchen Fan <we...@databricks.com>
Authored: Thu Oct 12 21:54:00 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Thu Oct 12 21:54:00 2017 -0700

----------------------------------------------------------------------
 .../sql/execution/datasources/DataSource.scala  | 12 +++++++++-
 .../datasources/DataSourceStrategy.scala        |  4 +++-
 .../datasources/FileFormatWriter.scala          | 24 +++++++++-----------
 .../InsertIntoHadoopFsRelationCommand.scala     | 10 ++++----
 .../execution/streaming/FileStreamSink.scala    | 11 ++++++++-
 .../org/apache/spark/sql/DataFrameSuite.scala   | 13 +++++++++++
 .../hive/execution/InsertIntoHiveTable.scala    |  9 +++++++-
 .../sql/hive/InsertIntoHiveTableSuite.scala     |  2 +-
 8 files changed, 62 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c9187db8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index a13bb24..9652f7c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -401,6 +401,16 @@ case class DataSource(
     val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
     PartitioningUtils.validatePartitionColumn(data.schema, partitionColumns, caseSensitive)
 
+    // SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does
+    // not need to have the query as child, to avoid to analyze an optimized query,
+    // because InsertIntoHadoopFsRelationCommand will be optimized first.
+    val partitionAttributes = partitionColumns.map { name =>
+      val plan = data.logicalPlan
+      plan.resolve(name :: Nil, data.sparkSession.sessionState.analyzer.resolver).getOrElse {
+        throw new AnalysisException(
+          s"Unable to resolve $name given [${plan.output.map(_.name).mkString(", ")}]")
+      }.asInstanceOf[Attribute]
+    }
     val fileIndex = catalogTable.map(_.identifier).map { tableIdent =>
       sparkSession.table(tableIdent).queryExecution.analyzed.collect {
         case LogicalRelation(t: HadoopFsRelation, _, _) => t.location
@@ -414,7 +424,7 @@ case class DataSource(
         outputPath = outputPath,
         staticPartitions = Map.empty,
         ifPartitionNotExists = false,
-        partitionColumns = partitionColumns,
+        partitionColumns = partitionAttributes,
         bucketSpec = bucketSpec,
         fileFormat = format,
         options = options,

http://git-wip-us.apache.org/repos/asf/spark/blob/c9187db8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 04ee081..0df1c38 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -188,13 +188,15 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
           "Cannot overwrite a path that is also being read from.")
       }
 
+      val partitionSchema = actualQuery.resolve(
+        t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver)
       val staticPartitions = parts.filter(_._2.nonEmpty).map { case (k, v) => k -> v.get }
 
       InsertIntoHadoopFsRelationCommand(
         outputPath,
         staticPartitions,
         i.ifPartitionNotExists,
-        partitionColumns = t.partitionSchema.map(_.name),
+        partitionSchema,
         t.bucketSpec,
         t.fileFormat,
         t.options,

http://git-wip-us.apache.org/repos/asf/spark/blob/c9187db8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 2c31d2a..e87cf8d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
 import org.apache.spark.sql.execution.{QueryExecution, SortExec, SQLExecution}
-import org.apache.spark.sql.types.{StringType, StructType}
+import org.apache.spark.sql.types.StringType
 import org.apache.spark.util.{SerializableConfiguration, Utils}
 
 
@@ -101,7 +101,7 @@ object FileFormatWriter extends Logging {
       committer: FileCommitProtocol,
       outputSpec: OutputSpec,
       hadoopConf: Configuration,
-      partitionColumnNames: Seq[String],
+      partitionColumns: Seq[Attribute],
       bucketSpec: Option[BucketSpec],
       refreshFunction: (Seq[TablePartitionSpec]) => Unit,
       options: Map[String, String]): Unit = {
@@ -111,16 +111,9 @@ object FileFormatWriter extends Logging {
     job.setOutputValueClass(classOf[InternalRow])
     FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath))
 
-    val allColumns = queryExecution.executedPlan.output
-    // Get the actual partition columns as attributes after matching them by name with
-    // the given columns names.
-    val partitionColumns = partitionColumnNames.map { col =>
-      val nameEquality = sparkSession.sessionState.conf.resolver
-      allColumns.find(f => nameEquality(f.name, col)).getOrElse {
-        throw new RuntimeException(
-          s"Partition column $col not found in schema ${queryExecution.executedPlan.schema}")
-      }
-    }
+    // Pick the attributes from analyzed plan, as optimizer may not preserve the output schema
+    // names' case.
+    val allColumns = queryExecution.analyzed.output
     val partitionSet = AttributeSet(partitionColumns)
     val dataColumns = allColumns.filterNot(partitionSet.contains)
 
@@ -179,8 +172,13 @@ object FileFormatWriter extends Logging {
         val rdd = if (orderingMatched) {
           queryExecution.toRdd
         } else {
+          // SPARK-21165: the `requiredOrdering` is based on the attributes from analyzed plan, and
+          // the physical plan may have different attribute ids due to optimizer removing some
+          // aliases. Here we bind the expression ahead to avoid potential attribute ids mismatch.
+          val orderingExpr = requiredOrdering
+            .map(SortOrder(_, Ascending)).map(BindReferences.bindReference(_, allColumns))
           SortExec(
-            requiredOrdering.map(SortOrder(_, Ascending)),
+            orderingExpr,
             global = false,
             child = queryExecution.executedPlan).execute()
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/c9187db8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index ab35fdc..c9d3144 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -44,7 +44,7 @@ case class InsertIntoHadoopFsRelationCommand(
     outputPath: Path,
     staticPartitions: TablePartitionSpec,
     ifPartitionNotExists: Boolean,
-    partitionColumns: Seq[String],
+    partitionColumns: Seq[Attribute],
     bucketSpec: Option[BucketSpec],
     fileFormat: FileFormat,
     options: Map[String, String],
@@ -150,7 +150,7 @@ case class InsertIntoHadoopFsRelationCommand(
         outputSpec = FileFormatWriter.OutputSpec(
           qualifiedOutputPath.toString, customPartitionLocations),
         hadoopConf = hadoopConf,
-        partitionColumnNames = partitionColumns,
+        partitionColumns = partitionColumns,
         bucketSpec = bucketSpec,
         refreshFunction = refreshPartitionsCallback,
         options = options)
@@ -176,10 +176,10 @@ case class InsertIntoHadoopFsRelationCommand(
       customPartitionLocations: Map[TablePartitionSpec, String],
       committer: FileCommitProtocol): Unit = {
     val staticPartitionPrefix = if (staticPartitions.nonEmpty) {
-      "/" + partitionColumns.flatMap { col =>
-        staticPartitions.get(col) match {
+      "/" + partitionColumns.flatMap { p =>
+        staticPartitions.get(p.name) match {
           case Some(value) =>
-            Some(escapePathName(col) + "=" + escapePathName(value))
+            Some(escapePathName(p.name) + "=" + escapePathName(value))
           case None =>
             None
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/c9187db8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index 2a65292..6885d0b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -111,6 +111,15 @@ class FileStreamSink(
         case _ =>  // Do nothing
       }
 
+      // Get the actual partition columns as attributes after matching them by name with
+      // the given columns names.
+      val partitionColumns: Seq[Attribute] = partitionColumnNames.map { col =>
+        val nameEquality = data.sparkSession.sessionState.conf.resolver
+        data.logicalPlan.output.find(f => nameEquality(f.name, col)).getOrElse {
+          throw new RuntimeException(s"Partition column $col not found in schema ${data.schema}")
+        }
+      }
+
       FileFormatWriter.write(
         sparkSession = sparkSession,
         queryExecution = data.queryExecution,
@@ -118,7 +127,7 @@ class FileStreamSink(
         committer = committer,
         outputSpec = FileFormatWriter.OutputSpec(path, Map.empty),
         hadoopConf = hadoopConf,
-        partitionColumnNames = partitionColumnNames,
+        partitionColumns = partitionColumns,
         bucketSpec = None,
         refreshFunction = _ => (),
         options = options)

http://git-wip-us.apache.org/repos/asf/spark/blob/c9187db8/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 3fa538c..1f29cb7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -1759,4 +1759,17 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
       testData2.select(lit(7), 'a, 'b).orderBy(lit(1), lit(2), lit(3)),
       Seq(Row(7, 1, 1), Row(7, 1, 2), Row(7, 2, 1), Row(7, 2, 2), Row(7, 3, 1), Row(7, 3, 2)))
   }
+
+  test("SPARK-22252: FileFormatWriter should respect the input query schema") {
+    withTable("t1", "t2", "t3", "t4") {
+      spark.range(1).select('id as 'col1, 'id as 'col2).write.saveAsTable("t1")
+      spark.sql("select COL1, COL2 from t1").write.saveAsTable("t2")
+      checkAnswer(spark.table("t2"), Row(0, 0))
+
+      // Test picking part of the columns when writing.
+      spark.range(1).select('id, 'id as 'col1, 'id as 'col2).write.saveAsTable("t3")
+      spark.sql("select COL1, COL2 from t3").write.saveAsTable("t4")
+      checkAnswer(spark.table("t4"), Row(0, 0))
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c9187db8/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 66ee5d4..8032d7e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -314,6 +314,13 @@ case class InsertIntoHiveTable(
       outputPath = tmpLocation.toString,
       isAppend = false)
 
+    val partitionAttributes = partitionColumnNames.takeRight(numDynamicPartitions).map { name =>
+      query.resolve(name :: Nil, sparkSession.sessionState.analyzer.resolver).getOrElse {
+        throw new AnalysisException(
+          s"Unable to resolve $name given [${query.output.map(_.name).mkString(", ")}]")
+      }.asInstanceOf[Attribute]
+    }
+
     FileFormatWriter.write(
       sparkSession = sparkSession,
       queryExecution = Dataset.ofRows(sparkSession, query).queryExecution,
@@ -321,7 +328,7 @@ case class InsertIntoHiveTable(
       committer = committer,
       outputSpec = FileFormatWriter.OutputSpec(tmpLocation.toString, Map.empty),
       hadoopConf = hadoopConf,
-      partitionColumnNames = partitionColumnNames.takeRight(numDynamicPartitions),
+      partitionColumns = partitionAttributes,
       bucketSpec = None,
       refreshFunction = _ => (),
       options = Map.empty)

http://git-wip-us.apache.org/repos/asf/spark/blob/c9187db8/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
index 618e5b6..d696156 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
@@ -468,7 +468,7 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
       }
   }
 
-  test("SPARK-21165: the query schema of INSERT is changed after optimization") {
+  test("SPARK-21165: FileFormatWriter should only rely on attributes from analyzed plan") {
     withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
       withTable("tab1", "tab2") {
         Seq(("a", "b", 3)).toDF("word", "first", "length").write.saveAsTable("tab1")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org