You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/11/29 05:58:06 UTC

spark git commit: [SPARK-18544][SQL] Append with df.saveAsTable writes data to wrong location

Repository: spark
Updated Branches:
  refs/heads/master d449988b8 -> e2318ede0


[SPARK-18544][SQL] Append with df.saveAsTable writes data to wrong location

## What changes were proposed in this pull request?

We failed to properly propagate table metadata for existing tables for the saveAsTable command. This caused a downstream component to think the table was MANAGED, writing data to the wrong location.

## How was this patch tested?

Unit test that fails before the patch.

Author: Eric Liang <ek...@databricks.com>

Closes #15983 from ericl/spark-18544.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e2318ede
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e2318ede
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e2318ede

Branch: refs/heads/master
Commit: e2318ede04fa7a756d1c8151775e1f2406a176ca
Parents: d449988
Author: Eric Liang <ek...@databricks.com>
Authored: Mon Nov 28 21:58:01 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Mon Nov 28 21:58:01 2016 -0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/DataFrameWriter.scala  | 21 ++++++++++++--------
 .../command/createDataSourceTables.scala        |  3 ++-
 .../PartitionProviderCompatibilitySuite.scala   | 19 ++++++++++++++++++
 3 files changed, 34 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e2318ede/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 2d86342..8294e41 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -373,8 +373,19 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
         throw new AnalysisException(s"Table $tableIdent already exists.")
 
       case _ =>
-        val storage = DataSource.buildStorageFormatFromOptions(extraOptions.toMap)
-        val tableType = if (storage.locationUri.isDefined) {
+        val existingTable = if (tableExists) {
+          Some(df.sparkSession.sessionState.catalog.getTableMetadata(tableIdent))
+        } else {
+          None
+        }
+        val storage = if (tableExists) {
+          existingTable.get.storage
+        } else {
+          DataSource.buildStorageFormatFromOptions(extraOptions.toMap)
+        }
+        val tableType = if (tableExists) {
+          existingTable.get.tableType
+        } else if (storage.locationUri.isDefined) {
           CatalogTableType.EXTERNAL
         } else {
           CatalogTableType.MANAGED
@@ -391,12 +402,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
         )
         df.sparkSession.sessionState.executePlan(
           CreateTable(tableDesc, mode, Some(df.logicalPlan))).toRdd
-        if (tableDesc.partitionColumnNames.nonEmpty &&
-            df.sparkSession.sqlContext.conf.manageFilesourcePartitions) {
-          // Need to recover partitions into the metastore so our saved data is visible.
-          df.sparkSession.sessionState.executePlan(
-            AlterTableRecoverPartitionsCommand(tableDesc.identifier)).toRdd
-        }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e2318ede/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index add732c..422700c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -212,7 +212,8 @@ case class CreateDataSourceTableAsSelectCommand(
       className = provider,
       partitionColumns = table.partitionColumnNames,
       bucketSpec = table.bucketSpec,
-      options = table.storage.properties ++ pathOption)
+      options = table.storage.properties ++ pathOption,
+      catalogTable = Some(table))
 
     val result = try {
       dataSource.write(mode, df)

http://git-wip-us.apache.org/repos/asf/spark/blob/e2318ede/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
index a1aa074..cace5fa 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
@@ -188,6 +188,25 @@ class PartitionProviderCompatibilitySuite
     }
   }
 
+  for (enabled <- Seq(true, false)) {
+    test(s"SPARK-18544 append with saveAsTable - partition management $enabled") {
+      withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> enabled.toString) {
+        withTable("test") {
+          withTempDir { dir =>
+            setupPartitionedDatasourceTable("test", dir)
+            if (enabled) {
+              spark.sql("msck repair table test")
+            }
+            assert(spark.sql("select * from test").count() == 5)
+            spark.range(10).selectExpr("id as fieldOne", "id as partCol")
+              .write.partitionBy("partCol").mode("append").saveAsTable("test")
+            assert(spark.sql("select * from test").count() == 15)
+          }
+        }
+      }
+    }
+  }
+
   /**
    * Runs a test against a multi-level partitioned table, then validates that the custom locations
    * were respected by the output writer.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org