You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/11/29 05:58:06 UTC
spark git commit: [SPARK-18544][SQL] Append with df.saveAsTable
writes data to wrong location
Repository: spark
Updated Branches:
refs/heads/master d449988b8 -> e2318ede0
[SPARK-18544][SQL] Append with df.saveAsTable writes data to wrong location
## What changes were proposed in this pull request?
We failed to properly propagate table metadata for existing tables for the saveAsTable command. This caused a downstream component to think the table was MANAGED, writing data to the wrong location.
## How was this patch tested?
Unit test that fails before the patch.
Author: Eric Liang <ek...@databricks.com>
Closes #15983 from ericl/spark-18544.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e2318ede
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e2318ede
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e2318ede
Branch: refs/heads/master
Commit: e2318ede04fa7a756d1c8151775e1f2406a176ca
Parents: d449988
Author: Eric Liang <ek...@databricks.com>
Authored: Mon Nov 28 21:58:01 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Mon Nov 28 21:58:01 2016 -0800
----------------------------------------------------------------------
.../org/apache/spark/sql/DataFrameWriter.scala | 21 ++++++++++++--------
.../command/createDataSourceTables.scala | 3 ++-
.../PartitionProviderCompatibilitySuite.scala | 19 ++++++++++++++++++
3 files changed, 34 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/e2318ede/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 2d86342..8294e41 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -373,8 +373,19 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
throw new AnalysisException(s"Table $tableIdent already exists.")
case _ =>
- val storage = DataSource.buildStorageFormatFromOptions(extraOptions.toMap)
- val tableType = if (storage.locationUri.isDefined) {
+ val existingTable = if (tableExists) {
+ Some(df.sparkSession.sessionState.catalog.getTableMetadata(tableIdent))
+ } else {
+ None
+ }
+ val storage = if (tableExists) {
+ existingTable.get.storage
+ } else {
+ DataSource.buildStorageFormatFromOptions(extraOptions.toMap)
+ }
+ val tableType = if (tableExists) {
+ existingTable.get.tableType
+ } else if (storage.locationUri.isDefined) {
CatalogTableType.EXTERNAL
} else {
CatalogTableType.MANAGED
@@ -391,12 +402,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
)
df.sparkSession.sessionState.executePlan(
CreateTable(tableDesc, mode, Some(df.logicalPlan))).toRdd
- if (tableDesc.partitionColumnNames.nonEmpty &&
- df.sparkSession.sqlContext.conf.manageFilesourcePartitions) {
- // Need to recover partitions into the metastore so our saved data is visible.
- df.sparkSession.sessionState.executePlan(
- AlterTableRecoverPartitionsCommand(tableDesc.identifier)).toRdd
- }
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/e2318ede/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index add732c..422700c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -212,7 +212,8 @@ case class CreateDataSourceTableAsSelectCommand(
className = provider,
partitionColumns = table.partitionColumnNames,
bucketSpec = table.bucketSpec,
- options = table.storage.properties ++ pathOption)
+ options = table.storage.properties ++ pathOption,
+ catalogTable = Some(table))
val result = try {
dataSource.write(mode, df)
http://git-wip-us.apache.org/repos/asf/spark/blob/e2318ede/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
index a1aa074..cace5fa 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
@@ -188,6 +188,25 @@ class PartitionProviderCompatibilitySuite
}
}
+ for (enabled <- Seq(true, false)) {
+ test(s"SPARK-18544 append with saveAsTable - partition management $enabled") {
+ withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> enabled.toString) {
+ withTable("test") {
+ withTempDir { dir =>
+ setupPartitionedDatasourceTable("test", dir)
+ if (enabled) {
+ spark.sql("msck repair table test")
+ }
+ assert(spark.sql("select * from test").count() == 5)
+ spark.range(10).selectExpr("id as fieldOne", "id as partCol")
+ .write.partitionBy("partCol").mode("append").saveAsTable("test")
+ assert(spark.sql("select * from test").count() == 15)
+ }
+ }
+ }
+ }
+ }
+
/**
* Runs a test against a multi-level partitioned table, then validates that the custom locations
* were respected by the output writer.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org