You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/03/11 04:59:36 UTC

spark git commit: [SPARK-19723][SQL] create datasource table with an non-existent location should work

Repository: spark
Updated Branches:
  refs/heads/master fb9beda54 -> f6fdf92d0


[SPARK-19723][SQL] create datasource table with an non-existent location should work

## What changes were proposed in this pull request?

This JIRA is a follow up work after [SPARK-19583](https://issues.apache.org/jira/browse/SPARK-19583)

As we discussed in that [PR](https://github.com/apache/spark/pull/16938)

The following DDL for datasource table with an non-existent location should work:
```
CREATE TABLE ... (PARTITIONED BY ...) LOCATION path
```
Currently it will throw exception that path not exists for datasource table for datasource table

## How was this patch tested?
unit test added

Author: windpiger <so...@outlook.com>

Closes #17055 from windpiger/CTDataSourcePathNotExists.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6fdf92d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6fdf92d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6fdf92d

Branch: refs/heads/master
Commit: f6fdf92d0dce2cb3340f3e2ff768e09ef69176cd
Parents: fb9beda
Author: windpiger <so...@outlook.com>
Authored: Fri Mar 10 20:59:32 2017 -0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Mar 10 20:59:32 2017 -0800

----------------------------------------------------------------------
 .../command/createDataSourceTables.scala        |   3 +-
 .../spark/sql/execution/command/DDLSuite.scala  | 106 +++++++++++-------
 .../spark/sql/hive/execution/HiveDDLSuite.scala | 111 ++++++++-----------
 3 files changed, 115 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f6fdf92d/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 3da66af..2d89011 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -73,7 +73,8 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
         className = table.provider.get,
         bucketSpec = table.bucketSpec,
         options = table.storage.properties ++ pathOption,
-        catalogTable = Some(tableWithDefaultOptions)).resolveRelation()
+        // As discussed in SPARK-19583, we don't check if the location is existed
+        catalogTable = Some(tableWithDefaultOptions)).resolveRelation(checkFilesExist = false)
 
     val partitionColumnNames = if (table.schema.nonEmpty) {
       table.partitionColumnNames

http://git-wip-us.apache.org/repos/asf/spark/blob/f6fdf92d/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 5f70a8c..0666f44 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -230,7 +230,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
   }
 
   private def getDBPath(dbName: String): URI = {
-    val warehousePath = makeQualifiedPath(s"${spark.sessionState.conf.warehousePath}")
+    val warehousePath = makeQualifiedPath(spark.sessionState.conf.warehousePath)
     new Path(CatalogUtils.URIToString(warehousePath), s"$dbName.db").toUri
   }
 
@@ -1899,7 +1899,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
     }
   }
 
-  test("insert data to a data source table which has a not existed location should succeed") {
+  test("insert data to a data source table which has a non-existing location should succeed") {
     withTable("t") {
       withTempDir { dir =>
         spark.sql(
@@ -1939,7 +1939,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
     }
   }
 
-  test("insert into a data source table with no existed partition location should succeed") {
+  test("insert into a data source table with a non-existing partition location should succeed") {
     withTable("t") {
       withTempDir { dir =>
         spark.sql(
@@ -1966,7 +1966,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
     }
   }
 
-  test("read data from a data source table which has a not existed location should succeed") {
+  test("read data from a data source table which has a non-existing location should succeed") {
     withTable("t") {
       withTempDir { dir =>
         spark.sql(
@@ -1994,7 +1994,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
     }
   }
 
-  test("read data from a data source table with no existed partition location should succeed") {
+  test("read data from a data source table with non-existing partition location should succeed") {
     withTable("t") {
       withTempDir { dir =>
         spark.sql(
@@ -2016,48 +2016,72 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
     }
   }
 
+  test("create datasource table with a non-existing location") {
+    withTable("t", "t1") {
+      withTempPath { dir =>
+        spark.sql(s"CREATE TABLE t(a int, b int) USING parquet LOCATION '$dir'")
+
+        val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+        assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
+
+        spark.sql("INSERT INTO TABLE t SELECT 1, 2")
+        assert(dir.exists())
+
+        checkAnswer(spark.table("t"), Row(1, 2))
+      }
+      // partition table
+      withTempPath { dir =>
+        spark.sql(s"CREATE TABLE t1(a int, b int) USING parquet PARTITIONED BY(a) LOCATION '$dir'")
+
+        val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
+        assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
+
+        spark.sql("INSERT INTO TABLE t1 PARTITION(a=1) SELECT 2")
+
+        val partDir = new File(dir, "a=1")
+        assert(partDir.exists())
+
+        checkAnswer(spark.table("t1"), Row(2, 1))
+      }
+    }
+  }
+
   Seq(true, false).foreach { shouldDelete =>
-    val tcName = if (shouldDelete) "non-existent" else "existed"
+    val tcName = if (shouldDelete) "non-existing" else "existed"
     test(s"CTAS for external data source table with a $tcName location") {
       withTable("t", "t1") {
-        withTempDir {
-          dir =>
-            if (shouldDelete) {
-              dir.delete()
-            }
-            spark.sql(
-              s"""
-                 |CREATE TABLE t
-                 |USING parquet
-                 |LOCATION '$dir'
-                 |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
-               """.stripMargin)
-            val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
-            assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
+        withTempDir { dir =>
+          if (shouldDelete) dir.delete()
+          spark.sql(
+            s"""
+               |CREATE TABLE t
+               |USING parquet
+               |LOCATION '$dir'
+               |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
+             """.stripMargin)
+          val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+          assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
 
-            checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
+          checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
         }
         // partition table
-        withTempDir {
-          dir =>
-            if (shouldDelete) {
-              dir.delete()
-            }
-            spark.sql(
-              s"""
-                 |CREATE TABLE t1
-                 |USING parquet
-                 |PARTITIONED BY(a, b)
-                 |LOCATION '$dir'
-                 |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
-               """.stripMargin)
-            val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
-            assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
-
-            val partDir = new File(dir, "a=3")
-            assert(partDir.exists())
-
-            checkAnswer(spark.table("t1"), Row(1, 2, 3, 4))
+        withTempDir { dir =>
+          if (shouldDelete) dir.delete()
+          spark.sql(
+            s"""
+               |CREATE TABLE t1
+               |USING parquet
+               |PARTITIONED BY(a, b)
+               |LOCATION '$dir'
+               |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
+             """.stripMargin)
+          val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
+          assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
+
+          val partDir = new File(dir, "a=3")
+          assert(partDir.exists())
+
+          checkAnswer(spark.table("t1"), Row(1, 2, 3, 4))
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/f6fdf92d/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 79ad156..d29242b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -1663,43 +1663,73 @@ class HiveDDLSuite
     }
   }
 
+  test("create hive table with a non-existing location") {
+    withTable("t", "t1") {
+      withTempPath { dir =>
+        spark.sql(s"CREATE TABLE t(a int, b int) USING hive LOCATION '$dir'")
+
+        val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+        assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
+
+        spark.sql("INSERT INTO TABLE t SELECT 1, 2")
+        assert(dir.exists())
+
+        checkAnswer(spark.table("t"), Row(1, 2))
+      }
+      // partition table
+      withTempPath { dir =>
+        spark.sql(
+          s"""
+             |CREATE TABLE t1(a int, b int)
+             |USING hive
+             |PARTITIONED BY(a)
+             |LOCATION '$dir'
+           """.stripMargin)
+
+        val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
+        assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
+
+        spark.sql("INSERT INTO TABLE t1 PARTITION(a=1) SELECT 2")
+
+        val partDir = new File(dir, "a=1")
+        assert(partDir.exists())
+
+        checkAnswer(spark.table("t1"), Row(2, 1))
+      }
+    }
+  }
+
   Seq(true, false).foreach { shouldDelete =>
-    val tcName = if (shouldDelete) "non-existent" else "existed"
-    test(s"CTAS for external data source table with a $tcName location") {
+    val tcName = if (shouldDelete) "non-existing" else "existed"
+
+    test(s"CTAS for external hive table with a $tcName location") {
       withTable("t", "t1") {
-        withTempDir {
-          dir =>
-            if (shouldDelete) {
-              dir.delete()
-            }
+        withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+          withTempDir { dir =>
+            if (shouldDelete) dir.delete()
             spark.sql(
               s"""
                  |CREATE TABLE t
-                 |USING parquet
+                 |USING hive
                  |LOCATION '$dir'
                  |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
                """.stripMargin)
-
             val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
             assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
 
             checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
-        }
-        // partition table
-        withTempDir {
-          dir =>
-            if (shouldDelete) {
-              dir.delete()
-            }
+          }
+          // partition table
+          withTempDir { dir =>
+            if (shouldDelete) dir.delete()
             spark.sql(
               s"""
                  |CREATE TABLE t1
-                 |USING parquet
+                 |USING hive
                  |PARTITIONED BY(a, b)
                  |LOCATION '$dir'
                  |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
                """.stripMargin)
-
             val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
             assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
 
@@ -1707,51 +1737,6 @@ class HiveDDLSuite
             assert(partDir.exists())
 
             checkAnswer(spark.table("t1"), Row(1, 2, 3, 4))
-        }
-      }
-    }
-
-    test(s"CTAS for external hive table with a $tcName location") {
-      withTable("t", "t1") {
-        withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
-          withTempDir {
-            dir =>
-              if (shouldDelete) {
-                dir.delete()
-              }
-              spark.sql(
-                s"""
-                   |CREATE TABLE t
-                   |USING hive
-                   |LOCATION '$dir'
-                   |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
-                 """.stripMargin)
-              val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
-              assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
-
-              checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
-          }
-          // partition table
-          withTempDir {
-            dir =>
-              if (shouldDelete) {
-                dir.delete()
-              }
-              spark.sql(
-                s"""
-                   |CREATE TABLE t1
-                   |USING hive
-                   |PARTITIONED BY(a, b)
-                   |LOCATION '$dir'
-                   |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
-                 """.stripMargin)
-              val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
-              assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
-
-              val partDir = new File(dir, "a=3")
-              assert(partDir.exists())
-
-              checkAnswer(spark.table("t1"), Row(1, 2, 3, 4))
           }
         }
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org