You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/03/11 04:59:36 UTC
spark git commit: [SPARK-19723][SQL] create datasource table with an
non-existent location should work
Repository: spark
Updated Branches:
refs/heads/master fb9beda54 -> f6fdf92d0
[SPARK-19723][SQL] create datasource table with an non-existent location should work
## What changes were proposed in this pull request?
This JIRA is a follow up work after [SPARK-19583](https://issues.apache.org/jira/browse/SPARK-19583)
As we discussed in that [PR](https://github.com/apache/spark/pull/16938)
The following DDL for datasource table with an non-existent location should work:
```
CREATE TABLE ... (PARTITIONED BY ...) LOCATION path
```
Currently it will throw exception that path not exists for datasource table for datasource table
## How was this patch tested?
unit test added
Author: windpiger <so...@outlook.com>
Closes #17055 from windpiger/CTDataSourcePathNotExists.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6fdf92d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6fdf92d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6fdf92d
Branch: refs/heads/master
Commit: f6fdf92d0dce2cb3340f3e2ff768e09ef69176cd
Parents: fb9beda
Author: windpiger <so...@outlook.com>
Authored: Fri Mar 10 20:59:32 2017 -0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Mar 10 20:59:32 2017 -0800
----------------------------------------------------------------------
.../command/createDataSourceTables.scala | 3 +-
.../spark/sql/execution/command/DDLSuite.scala | 106 +++++++++++-------
.../spark/sql/hive/execution/HiveDDLSuite.scala | 111 ++++++++-----------
3 files changed, 115 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/f6fdf92d/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 3da66af..2d89011 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -73,7 +73,8 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
className = table.provider.get,
bucketSpec = table.bucketSpec,
options = table.storage.properties ++ pathOption,
- catalogTable = Some(tableWithDefaultOptions)).resolveRelation()
+ // As discussed in SPARK-19583, we don't check if the location is existed
+ catalogTable = Some(tableWithDefaultOptions)).resolveRelation(checkFilesExist = false)
val partitionColumnNames = if (table.schema.nonEmpty) {
table.partitionColumnNames
http://git-wip-us.apache.org/repos/asf/spark/blob/f6fdf92d/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 5f70a8c..0666f44 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -230,7 +230,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
private def getDBPath(dbName: String): URI = {
- val warehousePath = makeQualifiedPath(s"${spark.sessionState.conf.warehousePath}")
+ val warehousePath = makeQualifiedPath(spark.sessionState.conf.warehousePath)
new Path(CatalogUtils.URIToString(warehousePath), s"$dbName.db").toUri
}
@@ -1899,7 +1899,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
}
- test("insert data to a data source table which has a not existed location should succeed") {
+ test("insert data to a data source table which has a non-existing location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
@@ -1939,7 +1939,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
}
- test("insert into a data source table with no existed partition location should succeed") {
+ test("insert into a data source table with a non-existing partition location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
@@ -1966,7 +1966,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
}
- test("read data from a data source table which has a not existed location should succeed") {
+ test("read data from a data source table which has a non-existing location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
@@ -1994,7 +1994,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
}
- test("read data from a data source table with no existed partition location should succeed") {
+ test("read data from a data source table with non-existing partition location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
@@ -2016,48 +2016,72 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
}
+ test("create datasource table with a non-existing location") {
+ withTable("t", "t1") {
+ withTempPath { dir =>
+ spark.sql(s"CREATE TABLE t(a int, b int) USING parquet LOCATION '$dir'")
+
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+ assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
+
+ spark.sql("INSERT INTO TABLE t SELECT 1, 2")
+ assert(dir.exists())
+
+ checkAnswer(spark.table("t"), Row(1, 2))
+ }
+ // partition table
+ withTempPath { dir =>
+ spark.sql(s"CREATE TABLE t1(a int, b int) USING parquet PARTITIONED BY(a) LOCATION '$dir'")
+
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
+ assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
+
+ spark.sql("INSERT INTO TABLE t1 PARTITION(a=1) SELECT 2")
+
+ val partDir = new File(dir, "a=1")
+ assert(partDir.exists())
+
+ checkAnswer(spark.table("t1"), Row(2, 1))
+ }
+ }
+ }
+
Seq(true, false).foreach { shouldDelete =>
- val tcName = if (shouldDelete) "non-existent" else "existed"
+ val tcName = if (shouldDelete) "non-existing" else "existed"
test(s"CTAS for external data source table with a $tcName location") {
withTable("t", "t1") {
- withTempDir {
- dir =>
- if (shouldDelete) {
- dir.delete()
- }
- spark.sql(
- s"""
- |CREATE TABLE t
- |USING parquet
- |LOCATION '$dir'
- |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
- """.stripMargin)
- val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
- assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
+ withTempDir { dir =>
+ if (shouldDelete) dir.delete()
+ spark.sql(
+ s"""
+ |CREATE TABLE t
+ |USING parquet
+ |LOCATION '$dir'
+ |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
+ """.stripMargin)
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+ assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
- checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
+ checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
}
// partition table
- withTempDir {
- dir =>
- if (shouldDelete) {
- dir.delete()
- }
- spark.sql(
- s"""
- |CREATE TABLE t1
- |USING parquet
- |PARTITIONED BY(a, b)
- |LOCATION '$dir'
- |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
- """.stripMargin)
- val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
- assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
-
- val partDir = new File(dir, "a=3")
- assert(partDir.exists())
-
- checkAnswer(spark.table("t1"), Row(1, 2, 3, 4))
+ withTempDir { dir =>
+ if (shouldDelete) dir.delete()
+ spark.sql(
+ s"""
+ |CREATE TABLE t1
+ |USING parquet
+ |PARTITIONED BY(a, b)
+ |LOCATION '$dir'
+ |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
+ """.stripMargin)
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
+ assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
+
+ val partDir = new File(dir, "a=3")
+ assert(partDir.exists())
+
+ checkAnswer(spark.table("t1"), Row(1, 2, 3, 4))
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/f6fdf92d/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 79ad156..d29242b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -1663,43 +1663,73 @@ class HiveDDLSuite
}
}
+ test("create hive table with a non-existing location") {
+ withTable("t", "t1") {
+ withTempPath { dir =>
+ spark.sql(s"CREATE TABLE t(a int, b int) USING hive LOCATION '$dir'")
+
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+ assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
+
+ spark.sql("INSERT INTO TABLE t SELECT 1, 2")
+ assert(dir.exists())
+
+ checkAnswer(spark.table("t"), Row(1, 2))
+ }
+ // partition table
+ withTempPath { dir =>
+ spark.sql(
+ s"""
+ |CREATE TABLE t1(a int, b int)
+ |USING hive
+ |PARTITIONED BY(a)
+ |LOCATION '$dir'
+ """.stripMargin)
+
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
+ assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
+
+ spark.sql("INSERT INTO TABLE t1 PARTITION(a=1) SELECT 2")
+
+ val partDir = new File(dir, "a=1")
+ assert(partDir.exists())
+
+ checkAnswer(spark.table("t1"), Row(2, 1))
+ }
+ }
+ }
+
Seq(true, false).foreach { shouldDelete =>
- val tcName = if (shouldDelete) "non-existent" else "existed"
- test(s"CTAS for external data source table with a $tcName location") {
+ val tcName = if (shouldDelete) "non-existing" else "existed"
+
+ test(s"CTAS for external hive table with a $tcName location") {
withTable("t", "t1") {
- withTempDir {
- dir =>
- if (shouldDelete) {
- dir.delete()
- }
+ withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+ withTempDir { dir =>
+ if (shouldDelete) dir.delete()
spark.sql(
s"""
|CREATE TABLE t
- |USING parquet
+ |USING hive
|LOCATION '$dir'
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
-
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
- }
- // partition table
- withTempDir {
- dir =>
- if (shouldDelete) {
- dir.delete()
- }
+ }
+ // partition table
+ withTempDir { dir =>
+ if (shouldDelete) dir.delete()
spark.sql(
s"""
|CREATE TABLE t1
- |USING parquet
+ |USING hive
|PARTITIONED BY(a, b)
|LOCATION '$dir'
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
-
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
@@ -1707,51 +1737,6 @@ class HiveDDLSuite
assert(partDir.exists())
checkAnswer(spark.table("t1"), Row(1, 2, 3, 4))
- }
- }
- }
-
- test(s"CTAS for external hive table with a $tcName location") {
- withTable("t", "t1") {
- withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
- withTempDir {
- dir =>
- if (shouldDelete) {
- dir.delete()
- }
- spark.sql(
- s"""
- |CREATE TABLE t
- |USING hive
- |LOCATION '$dir'
- |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
- """.stripMargin)
- val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
- assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
-
- checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
- }
- // partition table
- withTempDir {
- dir =>
- if (shouldDelete) {
- dir.delete()
- }
- spark.sql(
- s"""
- |CREATE TABLE t1
- |USING hive
- |PARTITIONED BY(a, b)
- |LOCATION '$dir'
- |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
- """.stripMargin)
- val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
- assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
-
- val partDir = new File(dir, "a=3")
- assert(partDir.exists())
-
- checkAnswer(spark.table("t1"), Row(1, 2, 3, 4))
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org