You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/01/23 11:06:10 UTC

spark git commit: [SPARK-19284][SQL] append to partitioned datasource table should without custom partition location

Repository: spark
Updated Branches:
  refs/heads/master c99492141 -> 0ef1421a6


[SPARK-19284][SQL] append to partitioned datasource table should without custom partition location

## What changes were proposed in this pull request?

when we append data to a existed partitioned datasource table, the InsertIntoHadoopFsRelationCommand.getCustomPartitionLocations currently
return the same location with Hive default, it should return None.

## How was this patch tested?

Author: windpiger <so...@outlook.com>

Closes #16642 from windpiger/appendSchema.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0ef1421a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0ef1421a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0ef1421a

Branch: refs/heads/master
Commit: 0ef1421a645be79857ef96a90464e0e669190dcf
Parents: c994921
Author: windpiger <so...@outlook.com>
Authored: Mon Jan 23 19:06:04 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Mon Jan 23 19:06:04 2017 +0800

----------------------------------------------------------------------
 .../command/createDataSourceTables.scala        |  3 +-
 .../spark/sql/execution/datasources/rules.scala |  6 ++--
 .../sql/sources/PartitionedWriteSuite.scala     | 32 ++++++++++++++++++++
 3 files changed, 36 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0ef1421a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 052efe5..5abd579 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -122,7 +122,6 @@ case class CreateDataSourceTableAsSelectCommand(
   override def run(sparkSession: SparkSession): Seq[Row] = {
     assert(table.tableType != CatalogTableType.VIEW)
     assert(table.provider.isDefined)
-    assert(table.schema.isEmpty)
 
     val sessionState = sparkSession.sessionState
     val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
@@ -144,6 +143,8 @@ case class CreateDataSourceTableAsSelectCommand(
       saveDataIntoTable(
         sparkSession, table, table.storage.locationUri, query, mode, tableExists = true)
     } else {
+      assert(table.schema.isEmpty)
+
       val tableLocation = if (table.tableType == CatalogTableType.MANAGED) {
         Some(sessionState.catalog.defaultTablePath(table.identifier))
       } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/0ef1421a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index beacb08..f8c7fca 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -95,7 +95,7 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
     case c @ CreateTable(tableDesc, SaveMode.Append, Some(query))
         if sparkSession.sessionState.catalog.tableExists(tableDesc.identifier) =>
       // This is guaranteed by the parser and `DataFrameWriter`
-      assert(tableDesc.schema.isEmpty && tableDesc.provider.isDefined)
+      assert(tableDesc.provider.isDefined)
 
       // Analyze the query in CTAS and then we can do the normalization and checking.
       val qe = sparkSession.sessionState.executePlan(query)
@@ -186,9 +186,7 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
       }
 
       c.copy(
-        // trust everything from the existing table, except schema as we assume it's empty in a lot
-        // of places, when we do CTAS.
-        tableDesc = existingTable.copy(schema = new StructType()),
+        tableDesc = existingTable,
         query = Some(newQuery))
 
     // Here we normalize partition, bucket and sort column names, w.r.t. the case sensitivity

http://git-wip-us.apache.org/repos/asf/spark/blob/0ef1421a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
index 953604e..bf7fabe 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
@@ -19,11 +19,31 @@ package org.apache.spark.sql.sources
 
 import java.io.File
 
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
 import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.util.Utils
 
+private class OnlyDetectCustomPathFileCommitProtocol(jobId: String, path: String, isAppend: Boolean)
+  extends SQLHadoopMapReduceCommitProtocol(jobId, path, isAppend)
+    with Serializable with Logging {
+
+  override def newTaskTempFileAbsPath(
+      taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = {
+    if (isAppend) {
+      throw new Exception("append data to an existed partitioned table, " +
+        "there should be no custom partition path sent to Task")
+    }
+
+    super.newTaskTempFileAbsPath(taskContext, absoluteDir, ext)
+  }
+}
+
 class PartitionedWriteSuite extends QueryTest with SharedSQLContext {
   import testImplicits._
 
@@ -92,6 +112,18 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext {
     }
   }
 
+  test("append data to an existed partitioned table without custom partition path") {
+    withTable("t") {
+      withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
+        classOf[OnlyDetectCustomPathFileCommitProtocol].getName) {
+        Seq((1, 2)).toDF("a", "b").write.partitionBy("b").saveAsTable("t")
+        // if custom partition path is detected by the task, it will throw an Exception
+        // from OnlyDetectCustomPathFileCommitProtocol above.
+        Seq((3, 2)).toDF("a", "b").write.mode("append").partitionBy("b").saveAsTable("t")
+      }
+    }
+  }
+
   /** Lists files recursively. */
   private def recursiveList(f: File): Array[File] = {
     require(f.isDirectory)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org