You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2018/09/11 16:20:38 UTC

spark git commit: [SPARK-25313][SQL][FOLLOW-UP][BACKPORT-2.3] Fix InsertIntoHiveDirCommand output schema in Parquet issue

Repository: spark
Updated Branches:
  refs/heads/branch-2.3 4b578184f -> 60e56bcab


[SPARK-25313][SQL][FOLLOW-UP][BACKPORT-2.3] Fix InsertIntoHiveDirCommand output schema in Parquet issue

## What changes were proposed in this pull request?

Backport https://github.com/apache/spark/pull/22359 to branch-2.3.

## How was this patch tested?

unit tests

Closes #22387 from wangyum/SPARK-25313-FOLLOW-UP-branch-2.3.

Authored-by: Yuming Wang <yu...@ebay.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/60e56bca
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/60e56bca
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/60e56bca

Branch: refs/heads/branch-2.3
Commit: 60e56bcabf29addcada2c19ce368a0e32c99e412
Parents: 4b57818
Author: Yuming Wang <yu...@ebay.com>
Authored: Tue Sep 11 09:20:15 2018 -0700
Committer: Dongjoon Hyun <do...@apache.org>
Committed: Tue Sep 11 09:20:15 2018 -0700

----------------------------------------------------------------------
 .../execution/command/DataWritingCommand.scala   | 15 ---------------
 .../CreateHiveTableAsSelectCommand.scala         |  4 ++--
 .../execution/InsertIntoHiveDirCommand.scala     |  5 ++---
 .../sql/hive/execution/InsertIntoHiveTable.scala |  1 -
 .../sql/hive/execution/SaveAsHiveFile.scala      |  3 +--
 .../spark/sql/hive/execution/HiveDDLSuite.scala  | 19 +++++++++++++++++++
 6 files changed, 24 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/60e56bca/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
index 0a185b8..a1bb5af 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
@@ -26,7 +26,6 @@ import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker
 import org.apache.spark.sql.execution.datasources.FileFormatWriter
 import org.apache.spark.sql.execution.metric.SQLMetric
-import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
 
 /**
@@ -75,18 +74,4 @@ object DataWritingCommand {
       attr.withName(outputName)
     }
   }
-
-  /**
-   * Returns schema of logical plan with provided names.
-   * The length of provided names should be the same of the length of [[LogicalPlan.schema]].
-   */
-  def logicalPlanSchemaWithNames(
-      query: LogicalPlan,
-      names: Seq[String]): StructType = {
-    assert(query.schema.length == names.length,
-      "The length of provided names doesn't match the length of query schema.")
-    StructType(query.schema.zip(names).map { case (structField, outputName) =>
-      structField.copy(name = outputName)
-    })
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/60e56bca/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index 1ff680f..66a8440 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -69,8 +69,8 @@ case class CreateHiveTableAsSelectCommand(
       // add the relation into catalog, just in case of failure occurs while data
       // processing.
       assert(tableDesc.schema.isEmpty)
-      val schema = DataWritingCommand.logicalPlanSchemaWithNames(query, outputColumnNames)
-      catalog.createTable(tableDesc.copy(schema = schema), ignoreIfExists = false)
+      catalog.createTable(
+        tableDesc.copy(schema = outputColumns.toStructType), ignoreIfExists = false)
 
       try {
         // Read back the metadata of the table which was created just now.

http://git-wip-us.apache.org/repos/asf/spark/blob/60e56bca/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
index 0a73aaa..a24e902 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
@@ -66,7 +66,7 @@ case class InsertIntoHiveDirCommand(
       identifier = TableIdentifier(storage.locationUri.get.toString, Some("default")),
       tableType = org.apache.spark.sql.catalyst.catalog.CatalogTableType.VIEW,
       storage = storage,
-      schema = query.schema
+      schema = outputColumns.toStructType
     ))
     hiveTable.getMetadata.put(serdeConstants.SERIALIZATION_LIB,
       storage.serde.getOrElse(classOf[LazySimpleSerDe].getName))
@@ -104,8 +104,7 @@ case class InsertIntoHiveDirCommand(
         plan = child,
         hadoopConf = hadoopConf,
         fileSinkConf = fileSinkConf,
-        outputLocation = tmpPath.toString,
-        allColumns = outputColumns)
+        outputLocation = tmpPath.toString)
 
       val fs = writeToPath.getFileSystem(hadoopConf)
       if (overwrite && fs.exists(writeToPath)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/60e56bca/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 75a0563..0ed464d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -198,7 +198,6 @@ case class InsertIntoHiveTable(
       hadoopConf = hadoopConf,
       fileSinkConf = fileSinkConf,
       outputLocation = tmpLocation.toString,
-      allColumns = outputColumns,
       partitionAttributes = partitionAttributes)
 
     if (partition.nonEmpty) {

http://git-wip-us.apache.org/repos/asf/spark/blob/60e56bca/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
index e484356..fb221a2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
@@ -51,7 +51,6 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
       hadoopConf: Configuration,
       fileSinkConf: FileSinkDesc,
       outputLocation: String,
-      allColumns: Seq[Attribute],
       customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty,
       partitionAttributes: Seq[Attribute] = Nil): Set[String] = {
 
@@ -90,7 +89,7 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
       fileFormat = new HiveFileFormat(fileSinkConf),
       committer = committer,
       outputSpec =
-        FileFormatWriter.OutputSpec(outputLocation, customPartitionLocations, allColumns),
+        FileFormatWriter.OutputSpec(outputLocation, customPartitionLocations, outputColumns),
       hadoopConf = hadoopConf,
       partitionColumns = partitionAttributes,
       bucketSpec = None,

http://git-wip-us.apache.org/repos/asf/spark/blob/60e56bca/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 7eaac85..03eb21b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -799,6 +799,25 @@ class HiveDDLSuite
     }
   }
 
+  test("SPARK-25313 Insert overwrite directory should output correct schema") {
+    withSQLConf(CONVERT_METASTORE_PARQUET.key -> "false") {
+      withTable("tbl") {
+        withView("view1") {
+          spark.sql("CREATE TABLE tbl(id long)")
+          spark.sql("INSERT OVERWRITE TABLE tbl VALUES 4")
+          spark.sql("CREATE VIEW view1 AS SELECT id FROM tbl")
+          withTempPath { path =>
+            spark.sql(s"INSERT OVERWRITE LOCAL DIRECTORY '${path.getCanonicalPath}' " +
+              "STORED AS PARQUET SELECT ID FROM view1")
+            val expectedSchema = StructType(Seq(StructField("ID", LongType, true)))
+            assert(spark.read.parquet(path.toString).schema == expectedSchema)
+            checkAnswer(spark.read.parquet(path.toString), Seq(Row(4)))
+          }
+        }
+      }
+    }
+  }
+
   test("alter table partition - storage information") {
     sql("CREATE TABLE boxes (height INT, length INT) PARTITIONED BY (width INT)")
     sql("INSERT OVERWRITE TABLE boxes PARTITION (width=4) SELECT 4, 4")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org