You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/03/02 06:50:30 UTC
spark git commit: [SPARK-19583][SQL] CTAS for data source table with
a created location should succeed
Repository: spark
Updated Branches:
refs/heads/master 89990a010 -> de2b53df4
[SPARK-19583][SQL] CTAS for data source table with a created location should succeed
## What changes were proposed in this pull request?
```
spark.sql(
s"""
|CREATE TABLE t
|USING parquet
|PARTITIONED BY(a, b)
|LOCATION '$dir'
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
```
Failed with the error message:
```
path file:/private/var/folders/6r/15tqm8hn3ldb3rmbfqm1gf4c0000gn/T/spark-195cd513-428a-4df9-b196-87db0c73e772 already exists.;
org.apache.spark.sql.AnalysisException: path file:/private/var/folders/6r/15tqm8hn3ldb3rmbfqm1gf4c0000gn/T/spark-195cd513-428a-4df9-b196-87db0c73e772 already exists.;
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:102)
```
while hive table is ok ,so we should fix it for datasource table.
The reason is that the SaveMode check is put in `InsertIntoHadoopFsRelationCommand` , and the SaveMode check actually use `path`, this is fine when we use `DataFrameWriter.save()`, because this situation of SaveMode act on `path`.
While when we use `CreateDataSourceAsSelectCommand`, the situation of SaveMode act on table, and
we have already do SaveMode check in `CreateDataSourceAsSelectCommand` for table , so we should not do SaveMode check in the following logic in `InsertIntoHadoopFsRelationCommand` for path, this is redundant and wrong logic for `CreateDataSourceAsSelectCommand`
After this PR, the following DDL will succeed, when the location has been created we will append it or overwrite it.
```
CREATE TABLE ... (PARTITIONED BY ...) LOCATION path AS SELECT ...
```
## How was this patch tested?
unit test added
Author: windpiger <so...@outlook.com>
Closes #16938 from windpiger/CTASDataSourceWitLocation.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/de2b53df
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/de2b53df
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/de2b53df
Branch: refs/heads/master
Commit: de2b53df4c779b265ae038d88f298786a9236234
Parents: 89990a0
Author: windpiger <so...@outlook.com>
Authored: Wed Mar 1 22:50:25 2017 -0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Mar 1 22:50:25 2017 -0800
----------------------------------------------------------------------
.../command/createDataSourceTables.scala | 4 +-
.../spark/sql/execution/command/DDLSuite.scala | 66 ++++++++++---
.../spark/sql/hive/execution/HiveDDLSuite.scala | 99 ++++++++++++++++++++
3 files changed, 156 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/de2b53df/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 5abd579..d835b52 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -141,7 +141,7 @@ case class CreateDataSourceTableAsSelectCommand(
}
saveDataIntoTable(
- sparkSession, table, table.storage.locationUri, query, mode, tableExists = true)
+ sparkSession, table, table.storage.locationUri, query, SaveMode.Append, tableExists = true)
} else {
assert(table.schema.isEmpty)
@@ -151,7 +151,7 @@ case class CreateDataSourceTableAsSelectCommand(
table.storage.locationUri
}
val result = saveDataIntoTable(
- sparkSession, table, tableLocation, query, mode, tableExists = false)
+ sparkSession, table, tableLocation, query, SaveMode.Overwrite, tableExists = false)
val newTable = table.copy(
storage = table.storage.copy(locationUri = tableLocation),
// We will use the schema of resolved.relation as the schema of the table (instead of
http://git-wip-us.apache.org/repos/asf/spark/blob/de2b53df/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index b44f20e..8b8cd0f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -1836,18 +1836,17 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
test("insert data to a data source table which has a not existed location should succeed") {
withTable("t") {
withTempDir { dir =>
- val path = dir.toURI.toString.stripSuffix("/")
spark.sql(
s"""
|CREATE TABLE t(a string, b int)
|USING parquet
- |OPTIONS(path "$path")
+ |OPTIONS(path "$dir")
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
- assert(table.location == path)
+ assert(table.location == dir.getAbsolutePath)
dir.delete
- val tableLocFile = new File(table.location.stripPrefix("file:"))
+ val tableLocFile = new File(table.location)
assert(!tableLocFile.exists)
spark.sql("INSERT INTO TABLE t SELECT 'c', 1")
assert(tableLocFile.exists)
@@ -1878,16 +1877,15 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
test("insert into a data source table with no existed partition location should succeed") {
withTable("t") {
withTempDir { dir =>
- val path = dir.toURI.toString.stripSuffix("/")
spark.sql(
s"""
|CREATE TABLE t(a int, b int, c int, d int)
|USING parquet
|PARTITIONED BY(a, b)
- |LOCATION "$path"
+ |LOCATION "$dir"
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
- assert(table.location == path)
+ assert(table.location == dir.getAbsolutePath)
spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)
@@ -1906,15 +1904,14 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
test("read data from a data source table which has a not existed location should succeed") {
withTable("t") {
withTempDir { dir =>
- val path = dir.toURI.toString.stripSuffix("/")
spark.sql(
s"""
|CREATE TABLE t(a string, b int)
|USING parquet
- |OPTIONS(path "$path")
+ |OPTIONS(path "$dir")
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
- assert(table.location == path)
+ assert(table.location == dir.getAbsolutePath)
dir.delete()
checkAnswer(spark.table("t"), Nil)
@@ -1939,7 +1936,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
|CREATE TABLE t(a int, b int, c int, d int)
|USING parquet
|PARTITIONED BY(a, b)
- |LOCATION "${dir.toURI}"
+ |LOCATION "$dir"
""".stripMargin)
spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)
@@ -1952,4 +1949,51 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
}
+
+ Seq(true, false).foreach { shouldDelete =>
+ val tcName = if (shouldDelete) "non-existent" else "existed"
+ test(s"CTAS for external data source table with a $tcName location") {
+ withTable("t", "t1") {
+ withTempDir {
+ dir =>
+ if (shouldDelete) {
+ dir.delete()
+ }
+ spark.sql(
+ s"""
+ |CREATE TABLE t
+ |USING parquet
+ |LOCATION '$dir'
+ |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
+ """.stripMargin)
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+ assert(table.location == dir.getAbsolutePath)
+
+ checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
+ }
+ // partition table
+ withTempDir {
+ dir =>
+ if (shouldDelete) {
+ dir.delete()
+ }
+ spark.sql(
+ s"""
+ |CREATE TABLE t1
+ |USING parquet
+ |PARTITIONED BY(a, b)
+ |LOCATION '$dir'
+ |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
+ """.stripMargin)
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
+ assert(table.location == dir.getAbsolutePath)
+
+ val partDir = new File(dir, "a=3")
+ assert(partDir.exists())
+
+ checkAnswer(spark.table("t1"), Row(1, 2, 3, 4))
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/de2b53df/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 792ac1e..81ae5b7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -1587,4 +1587,103 @@ class HiveDDLSuite
}
}
}
+
+ Seq(true, false).foreach { shouldDelete =>
+ val tcName = if (shouldDelete) "non-existent" else "existed"
+ test(s"CTAS for external data source table with a $tcName location") {
+ withTable("t", "t1") {
+ withTempDir {
+ dir =>
+ if (shouldDelete) {
+ dir.delete()
+ }
+ spark.sql(
+ s"""
+ |CREATE TABLE t
+ |USING parquet
+ |LOCATION '$dir'
+ |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
+ """.stripMargin)
+
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+ assert(table.location == dir.getAbsolutePath)
+
+ checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
+ }
+ // partition table
+ withTempDir {
+ dir =>
+ if (shouldDelete) {
+ dir.delete()
+ }
+ spark.sql(
+ s"""
+ |CREATE TABLE t1
+ |USING parquet
+ |PARTITIONED BY(a, b)
+ |LOCATION '$dir'
+ |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
+ """.stripMargin)
+
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
+ assert(table.location == dir.getAbsolutePath)
+
+ val partDir = new File(dir, "a=3")
+ assert(partDir.exists())
+
+ checkAnswer(spark.table("t1"), Row(1, 2, 3, 4))
+ }
+ }
+ }
+
+ test(s"CTAS for external hive table with a $tcName location") {
+ withTable("t", "t1") {
+ withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+ withTempDir {
+ dir =>
+ if (shouldDelete) {
+ dir.delete()
+ }
+ spark.sql(
+ s"""
+ |CREATE TABLE t
+ |USING hive
+ |LOCATION '$dir'
+ |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
+ """.stripMargin)
+ val dirPath = new Path(dir.getAbsolutePath)
+ val fs = dirPath.getFileSystem(spark.sessionState.newHadoopConf())
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+ assert(new Path(table.location) == fs.makeQualified(dirPath))
+
+ checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
+ }
+ // partition table
+ withTempDir {
+ dir =>
+ if (shouldDelete) {
+ dir.delete()
+ }
+ spark.sql(
+ s"""
+ |CREATE TABLE t1
+ |USING hive
+ |PARTITIONED BY(a, b)
+ |LOCATION '$dir'
+ |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
+ """.stripMargin)
+ val dirPath = new Path(dir.getAbsolutePath)
+ val fs = dirPath.getFileSystem(spark.sessionState.newHadoopConf())
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
+ assert(new Path(table.location) == fs.makeQualified(dirPath))
+
+ val partDir = new File(dir, "a=3")
+ assert(partDir.exists())
+
+ checkAnswer(spark.table("t1"), Row(1, 2, 3, 4))
+ }
+ }
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org