You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/09/10 05:47:29 UTC
spark git commit: [SPARK-25313][SQL][FOLLOW-UP] Fix
InsertIntoHiveDirCommand output schema in Parquet issue
Repository: spark
Updated Branches:
refs/heads/master a0aed475c -> f8b4d5aaf
[SPARK-25313][SQL][FOLLOW-UP] Fix InsertIntoHiveDirCommand output schema in Parquet issue
## What changes were proposed in this pull request?
How to reproduce:
```scala
spark.sql("CREATE TABLE tbl(id long)")
spark.sql("INSERT OVERWRITE TABLE tbl VALUES 4")
spark.sql("CREATE VIEW view1 AS SELECT id FROM tbl")
spark.sql(s"INSERT OVERWRITE LOCAL DIRECTORY '/tmp/spark/parquet' " +
"STORED AS PARQUET SELECT ID FROM view1")
spark.read.parquet("/tmp/spark/parquet").schema
scala> spark.read.parquet("/tmp/spark/parquet").schema
res10: org.apache.spark.sql.types.StructType = StructType(StructField(id,LongType,true))
```
The schema should be `StructType(StructField(ID,LongType,true))` as we `SELECT ID FROM view1`.
This pr fix this issue.
## How was this patch tested?
unit tests
Closes #22359 from wangyum/SPARK-25313-FOLLOW-UP.
Authored-by: Yuming Wang <yu...@ebay.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f8b4d5aa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f8b4d5aa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f8b4d5aa
Branch: refs/heads/master
Commit: f8b4d5aafd1923d9524415601469f8749b3d0811
Parents: a0aed47
Author: Yuming Wang <yu...@ebay.com>
Authored: Mon Sep 10 13:47:19 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Mon Sep 10 13:47:19 2018 +0800
----------------------------------------------------------------------
.../execution/command/DataWritingCommand.scala | 15 ---------------
.../CreateHiveTableAsSelectCommand.scala | 4 ++--
.../execution/InsertIntoHiveDirCommand.scala | 5 ++---
.../sql/hive/execution/InsertIntoHiveTable.scala | 1 -
.../sql/hive/execution/SaveAsHiveFile.scala | 3 +--
.../spark/sql/hive/execution/HiveDDLSuite.scala | 19 +++++++++++++++++++
6 files changed, 24 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/f8b4d5aa/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
index 0a185b8..a1bb5af 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
@@ -26,7 +26,6 @@ import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker
import org.apache.spark.sql.execution.datasources.FileFormatWriter
import org.apache.spark.sql.execution.metric.SQLMetric
-import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
/**
@@ -75,18 +74,4 @@ object DataWritingCommand {
attr.withName(outputName)
}
}
-
- /**
- * Returns schema of logical plan with provided names.
- * The length of provided names should be the same of the length of [[LogicalPlan.schema]].
- */
- def logicalPlanSchemaWithNames(
- query: LogicalPlan,
- names: Seq[String]): StructType = {
- assert(query.schema.length == names.length,
- "The length of provided names doesn't match the length of query schema.")
- StructType(query.schema.zip(names).map { case (structField, outputName) =>
- structField.copy(name = outputName)
- })
- }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/f8b4d5aa/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index 0eb2f0d..aa573b5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -69,8 +69,8 @@ case class CreateHiveTableAsSelectCommand(
// add the relation into catalog, just in case of failure occurs while data
// processing.
assert(tableDesc.schema.isEmpty)
- val schema = DataWritingCommand.logicalPlanSchemaWithNames(query, outputColumnNames)
- catalog.createTable(tableDesc.copy(schema = schema), ignoreIfExists = false)
+ catalog.createTable(
+ tableDesc.copy(schema = outputColumns.toStructType), ignoreIfExists = false)
try {
// Read back the metadata of the table which was created just now.
http://git-wip-us.apache.org/repos/asf/spark/blob/f8b4d5aa/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
index 0a73aaa..a24e902 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
@@ -66,7 +66,7 @@ case class InsertIntoHiveDirCommand(
identifier = TableIdentifier(storage.locationUri.get.toString, Some("default")),
tableType = org.apache.spark.sql.catalyst.catalog.CatalogTableType.VIEW,
storage = storage,
- schema = query.schema
+ schema = outputColumns.toStructType
))
hiveTable.getMetadata.put(serdeConstants.SERIALIZATION_LIB,
storage.serde.getOrElse(classOf[LazySimpleSerDe].getName))
@@ -104,8 +104,7 @@ case class InsertIntoHiveDirCommand(
plan = child,
hadoopConf = hadoopConf,
fileSinkConf = fileSinkConf,
- outputLocation = tmpPath.toString,
- allColumns = outputColumns)
+ outputLocation = tmpPath.toString)
val fs = writeToPath.getFileSystem(hadoopConf)
if (overwrite && fs.exists(writeToPath)) {
http://git-wip-us.apache.org/repos/asf/spark/blob/f8b4d5aa/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 75a0563..0ed464d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -198,7 +198,6 @@ case class InsertIntoHiveTable(
hadoopConf = hadoopConf,
fileSinkConf = fileSinkConf,
outputLocation = tmpLocation.toString,
- allColumns = outputColumns,
partitionAttributes = partitionAttributes)
if (partition.nonEmpty) {
http://git-wip-us.apache.org/repos/asf/spark/blob/f8b4d5aa/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
index e0f7375..078968e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
@@ -51,7 +51,6 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
hadoopConf: Configuration,
fileSinkConf: FileSinkDesc,
outputLocation: String,
- allColumns: Seq[Attribute],
customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty,
partitionAttributes: Seq[Attribute] = Nil): Set[String] = {
@@ -90,7 +89,7 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
fileFormat = new HiveFileFormat(fileSinkConf),
committer = committer,
outputSpec =
- FileFormatWriter.OutputSpec(outputLocation, customPartitionLocations, allColumns),
+ FileFormatWriter.OutputSpec(outputLocation, customPartitionLocations, outputColumns),
hadoopConf = hadoopConf,
partitionColumns = partitionAttributes,
bucketSpec = None,
http://git-wip-us.apache.org/repos/asf/spark/blob/f8b4d5aa/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 9acd5e1..69ee2bb 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -803,6 +803,25 @@ class HiveDDLSuite
}
}
+ test("SPARK-25313 Insert overwrite directory should output correct schema") {
+ withSQLConf(CONVERT_METASTORE_PARQUET.key -> "false") {
+ withTable("tbl") {
+ withView("view1") {
+ spark.sql("CREATE TABLE tbl(id long)")
+ spark.sql("INSERT OVERWRITE TABLE tbl VALUES 4")
+ spark.sql("CREATE VIEW view1 AS SELECT id FROM tbl")
+ withTempPath { path =>
+ spark.sql(s"INSERT OVERWRITE LOCAL DIRECTORY '${path.getCanonicalPath}' " +
+ "STORED AS PARQUET SELECT ID FROM view1")
+ val expectedSchema = StructType(Seq(StructField("ID", LongType, true)))
+ assert(spark.read.parquet(path.toString).schema == expectedSchema)
+ checkAnswer(spark.read.parquet(path.toString), Seq(Row(4)))
+ }
+ }
+ }
+ }
+ }
+
test("alter table partition - storage information") {
sql("CREATE TABLE boxes (height INT, length INT) PARTITIONED BY (width INT)")
sql("INSERT OVERWRITE TABLE boxes PARTITION (width=4) SELECT 4, 4")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org