You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/01/23 11:06:10 UTC
spark git commit: [SPARK-19284][SQL] append to partitioned datasource
table should without custom partition location
Repository: spark
Updated Branches:
refs/heads/master c99492141 -> 0ef1421a6
[SPARK-19284][SQL] append to partitioned datasource table should without custom partition location
## What changes were proposed in this pull request?
when we append data to a existed partitioned datasource table, the InsertIntoHadoopFsRelationCommand.getCustomPartitionLocations currently
return the same location with Hive default, it should return None.
## How was this patch tested?
Author: windpiger <so...@outlook.com>
Closes #16642 from windpiger/appendSchema.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0ef1421a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0ef1421a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0ef1421a
Branch: refs/heads/master
Commit: 0ef1421a645be79857ef96a90464e0e669190dcf
Parents: c994921
Author: windpiger <so...@outlook.com>
Authored: Mon Jan 23 19:06:04 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Mon Jan 23 19:06:04 2017 +0800
----------------------------------------------------------------------
.../command/createDataSourceTables.scala | 3 +-
.../spark/sql/execution/datasources/rules.scala | 6 ++--
.../sql/sources/PartitionedWriteSuite.scala | 32 ++++++++++++++++++++
3 files changed, 36 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/0ef1421a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 052efe5..5abd579 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -122,7 +122,6 @@ case class CreateDataSourceTableAsSelectCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
assert(table.tableType != CatalogTableType.VIEW)
assert(table.provider.isDefined)
- assert(table.schema.isEmpty)
val sessionState = sparkSession.sessionState
val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
@@ -144,6 +143,8 @@ case class CreateDataSourceTableAsSelectCommand(
saveDataIntoTable(
sparkSession, table, table.storage.locationUri, query, mode, tableExists = true)
} else {
+ assert(table.schema.isEmpty)
+
val tableLocation = if (table.tableType == CatalogTableType.MANAGED) {
Some(sessionState.catalog.defaultTablePath(table.identifier))
} else {
http://git-wip-us.apache.org/repos/asf/spark/blob/0ef1421a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index beacb08..f8c7fca 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -95,7 +95,7 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
case c @ CreateTable(tableDesc, SaveMode.Append, Some(query))
if sparkSession.sessionState.catalog.tableExists(tableDesc.identifier) =>
// This is guaranteed by the parser and `DataFrameWriter`
- assert(tableDesc.schema.isEmpty && tableDesc.provider.isDefined)
+ assert(tableDesc.provider.isDefined)
// Analyze the query in CTAS and then we can do the normalization and checking.
val qe = sparkSession.sessionState.executePlan(query)
@@ -186,9 +186,7 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
}
c.copy(
- // trust everything from the existing table, except schema as we assume it's empty in a lot
- // of places, when we do CTAS.
- tableDesc = existingTable.copy(schema = new StructType()),
+ tableDesc = existingTable,
query = Some(newQuery))
// Here we normalize partition, bucket and sort column names, w.r.t. the case sensitivity
http://git-wip-us.apache.org/repos/asf/spark/blob/0ef1421a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
index 953604e..bf7fabe 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
@@ -19,11 +19,31 @@ package org.apache.spark.sql.sources
import java.io.File
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+
+import org.apache.spark.internal.Logging
import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.Utils
+private class OnlyDetectCustomPathFileCommitProtocol(jobId: String, path: String, isAppend: Boolean)
+ extends SQLHadoopMapReduceCommitProtocol(jobId, path, isAppend)
+ with Serializable with Logging {
+
+ override def newTaskTempFileAbsPath(
+ taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = {
+ if (isAppend) {
+ throw new Exception("append data to an existed partitioned table, " +
+ "there should be no custom partition path sent to Task")
+ }
+
+ super.newTaskTempFileAbsPath(taskContext, absoluteDir, ext)
+ }
+}
+
class PartitionedWriteSuite extends QueryTest with SharedSQLContext {
import testImplicits._
@@ -92,6 +112,18 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext {
}
}
+ test("append data to an existed partitioned table without custom partition path") {
+ withTable("t") {
+ withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
+ classOf[OnlyDetectCustomPathFileCommitProtocol].getName) {
+ Seq((1, 2)).toDF("a", "b").write.partitionBy("b").saveAsTable("t")
+ // if custom partition path is detected by the task, it will throw an Exception
+ // from OnlyDetectCustomPathFileCommitProtocol above.
+ Seq((3, 2)).toDF("a", "b").write.mode("append").partitionBy("b").saveAsTable("t")
+ }
+ }
+ }
+
/** Lists files recursively. */
private def recursiveList(f: File): Array[File] = {
require(f.isDirectory)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org