You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/03/09 09:18:21 UTC

spark git commit: [SPARK-19763][SQL] qualified external datasource table location stored in catalog

Repository: spark
Updated Branches:
  refs/heads/master eeb1d6db8 -> 274973d2a


[SPARK-19763][SQL] qualified external datasource table location stored in catalog

## What changes were proposed in this pull request?

If we create a external datasource table with a non-qualified location , we should qualified it to store in catalog.

```
CREATE TABLE t(a string)
USING parquet
LOCATION '/path/xx'

CREATE TABLE t1(a string, b string)
USING parquet
PARTITIONED BY(b)
LOCATION '/path/xx'
```

when we get the table from catalog, the location should be qualified, e.g.'file:/path/xxx'
## How was this patch tested?
unit test added

Author: windpiger <so...@outlook.com>

Closes #17095 from windpiger/tablepathQualified.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/274973d2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/274973d2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/274973d2

Branch: refs/heads/master
Commit: 274973d2a32ff4eb28545b50a3135e4784eb3047
Parents: eeb1d6d
Author: windpiger <so...@outlook.com>
Authored: Thu Mar 9 01:18:17 2017 -0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Mar 9 01:18:17 2017 -0800

----------------------------------------------------------------------
 .../sql/catalyst/catalog/SessionCatalog.scala   | 14 +++++-
 .../spark/sql/execution/command/DDLSuite.scala  | 50 ++++++++++++++++----
 .../spark/sql/internal/CatalogSuite.scala       |  3 +-
 .../spark/sql/sources/PathOptionSuite.scala     |  9 ++--
 .../apache/spark/sql/test/SQLTestUtils.scala    |  5 --
 .../sql/hive/HiveMetastoreCatalogSuite.scala    |  4 +-
 .../spark/sql/hive/client/VersionsSuite.scala   |  6 +--
 .../spark/sql/hive/execution/HiveDDLSuite.scala |  4 +-
 8 files changed, 64 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/274973d2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 6cfc4a4..bfcdb70 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -259,7 +259,19 @@ class SessionCatalog(
     val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase))
     val table = formatTableName(tableDefinition.identifier.table)
     validateName(table)
-    val newTableDefinition = tableDefinition.copy(identifier = TableIdentifier(table, Some(db)))
+
+    val newTableDefinition = if (tableDefinition.storage.locationUri.isDefined
+      && !tableDefinition.storage.locationUri.get.isAbsolute) {
+      // make the location of the table qualified.
+      val qualifiedTableLocation =
+        makeQualifiedPath(tableDefinition.storage.locationUri.get)
+      tableDefinition.copy(
+        storage = tableDefinition.storage.copy(locationUri = Some(qualifiedTableLocation)),
+        identifier = TableIdentifier(table, Some(db)))
+    } else {
+      tableDefinition.copy(identifier = TableIdentifier(table, Some(db)))
+    }
+
     requireDbExists(db)
     externalCatalog.createTable(newTableDefinition, ignoreIfExists)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/274973d2/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index aa335c4..5f70a8c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -230,8 +230,8 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
   }
 
   private def getDBPath(dbName: String): URI = {
-    val warehousePath = s"file:${spark.sessionState.conf.warehousePath.stripPrefix("file:")}"
-    new Path(warehousePath, s"$dbName.db").toUri
+    val warehousePath = makeQualifiedPath(s"${spark.sessionState.conf.warehousePath}")
+    new Path(CatalogUtils.URIToString(warehousePath), s"$dbName.db").toUri
   }
 
   test("the qualified path of a database is stored in the catalog") {
@@ -1360,7 +1360,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
     val partitionLocation = if (isUsingHiveMetastore) {
       val tableLocation = catalog.getTableMetadata(tableIdent).storage.locationUri
       assert(tableLocation.isDefined)
-      makeQualifiedPath(new Path(tableLocation.get.toString, "paris"))
+      makeQualifiedPath(new Path(tableLocation.get.toString, "paris").toString)
     } else {
       new URI("paris")
     }
@@ -1909,7 +1909,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
              |OPTIONS(path "$dir")
            """.stripMargin)
         val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
-        assert(table.location == new URI(dir.getAbsolutePath))
+        assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
 
         dir.delete
         assert(!dir.exists)
@@ -1950,7 +1950,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
              |LOCATION "$dir"
            """.stripMargin)
         val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
-        assert(table.location == new URI(dir.getAbsolutePath))
+        assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
 
         spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
         checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)
@@ -1976,7 +1976,8 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
              |OPTIONS(path "$dir")
            """.stripMargin)
         val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
-        assert(table.location == new URI(dir.getAbsolutePath))
+
+        assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
 
         dir.delete()
         checkAnswer(spark.table("t"), Nil)
@@ -2032,7 +2033,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
                  |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
                """.stripMargin)
             val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
-            assert(table.location == new URI(dir.getAbsolutePath))
+            assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
 
             checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
         }
@@ -2051,7 +2052,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
                  |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
                """.stripMargin)
             val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
-            assert(table.location == new URI(dir.getAbsolutePath))
+            assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
 
             val partDir = new File(dir, "a=3")
             assert(partDir.exists())
@@ -2099,7 +2100,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
              """.stripMargin)
 
           val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
-          assert(table.location == new Path(loc.getAbsolutePath).toUri)
+          assert(table.location == makeQualifiedPath(loc.getAbsolutePath))
           assert(new Path(table.location).toString.contains(specialChars))
 
           assert(loc.listFiles().isEmpty)
@@ -2120,7 +2121,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
              """.stripMargin)
 
           val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
-          assert(table.location == new Path(loc.getAbsolutePath).toUri)
+          assert(table.location == makeQualifiedPath(loc.getAbsolutePath))
           assert(new Path(table.location).toString.contains(specialChars))
 
           assert(loc.listFiles().isEmpty)
@@ -2162,4 +2163,33 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
       }
     }
   }
+
+  test("the qualified path of a datasource table is stored in the catalog") {
+    withTable("t", "t1") {
+      withTempDir { dir =>
+        assert(!dir.getAbsolutePath.startsWith("file:/"))
+        spark.sql(
+          s"""
+             |CREATE TABLE t(a string)
+             |USING parquet
+             |LOCATION '$dir'
+           """.stripMargin)
+        val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+        assert(table.location.toString.startsWith("file:/"))
+      }
+
+      withTempDir { dir =>
+        assert(!dir.getAbsolutePath.startsWith("file:/"))
+        spark.sql(
+          s"""
+             |CREATE TABLE t1(a string, b string)
+             |USING parquet
+             |PARTITIONED BY(b)
+             |LOCATION '$dir'
+           """.stripMargin)
+        val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
+        assert(table.location.toString.startsWith("file:/"))
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/274973d2/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index fcb8ffb..9742b3b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.internal
 
 import java.io.File
-import java.net.URI
 
 import org.scalatest.BeforeAndAfterEach
 
@@ -459,7 +458,7 @@ class CatalogSuite
           options = Map("path" -> dir.getAbsolutePath))
         val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
         assert(table.tableType == CatalogTableType.EXTERNAL)
-        assert(table.storage.locationUri.get == new URI(dir.getAbsolutePath))
+        assert(table.storage.locationUri.get == makeQualifiedPath(dir.getAbsolutePath))
 
         Seq((1)).toDF("i").write.insertInto("t")
         assert(dir.exists() && dir.listFiles().nonEmpty)

http://git-wip-us.apache.org/repos/asf/spark/blob/274973d2/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
index 7ab339e..60adee4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
@@ -75,7 +75,7 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext {
            |USING ${classOf[TestOptionsSource].getCanonicalName}
            |OPTIONS (PATH '/tmp/path')
         """.stripMargin)
-      assert(getPathOption("src") == Some("/tmp/path"))
+      assert(getPathOption("src") == Some("file:/tmp/path"))
     }
 
     // should exist even path option is not specified when creating table
@@ -88,15 +88,16 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext {
   test("path option also exist for write path") {
     withTable("src") {
       withTempPath { p =>
-        val path = new Path(p.getAbsolutePath).toString
         sql(
           s"""
             |CREATE TABLE src
             |USING ${classOf[TestOptionsSource].getCanonicalName}
-            |OPTIONS (PATH '$path')
+            |OPTIONS (PATH '$p')
             |AS SELECT 1
           """.stripMargin)
-        assert(spark.table("src").schema.head.metadata.getString("path") == path)
+        assert(CatalogUtils.stringToURI(
+          spark.table("src").schema.head.metadata.getString("path")) ==
+          makeQualifiedPath(p.getAbsolutePath))
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/274973d2/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index 12fc899..9201954 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -306,11 +306,6 @@ private[sql] trait SQLTestUtils
     val fs = hadoopPath.getFileSystem(spark.sessionState.newHadoopConf())
     fs.makeQualified(hadoopPath).toUri
   }
-
-  def makeQualifiedPath(path: Path): URI = {
-    val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
-    fs.makeQualified(path).toUri
-  }
 }
 
 private[sql] object SQLTestUtils {

http://git-wip-us.apache.org/repos/asf/spark/blob/274973d2/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index cf552b4..079358b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.hive
 
-import java.net.URI
-
 import org.apache.spark.sql.{QueryTest, Row, SaveMode}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTableType
@@ -142,7 +140,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
           assert(hiveTable.storage.serde === Some(serde))
 
           assert(hiveTable.tableType === CatalogTableType.EXTERNAL)
-          assert(hiveTable.storage.locationUri === Some(new URI(path.getAbsolutePath)))
+          assert(hiveTable.storage.locationUri === Some(makeQualifiedPath(dir.getAbsolutePath)))
 
           val columns = hiveTable.schema
           assert(columns.map(_.name) === Seq("d1", "d2"))

http://git-wip-us.apache.org/repos/asf/spark/blob/274973d2/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index dd624ec..6025f8a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -658,19 +658,17 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w
 
         val tPath = new Path(spark.sessionState.conf.warehousePath, "t")
         Seq("1").toDF("a").write.saveAsTable("t")
-        val expectedPath = s"file:${tPath.toUri.getPath.stripSuffix("/")}"
         val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
 
-        assert(table.location == CatalogUtils.stringToURI(expectedPath))
+        assert(table.location == makeQualifiedPath(tPath.toString))
         assert(tPath.getFileSystem(spark.sessionState.newHadoopConf()).exists(tPath))
         checkAnswer(spark.table("t"), Row("1") :: Nil)
 
         val t1Path = new Path(spark.sessionState.conf.warehousePath, "t1")
         spark.sql("create table t1 using parquet as select 2 as a")
         val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
-        val expectedPath1 = s"file:${t1Path.toUri.getPath.stripSuffix("/")}"
 
-        assert(table1.location == CatalogUtils.stringToURI(expectedPath1))
+        assert(table1.location == makeQualifiedPath(t1Path.toString))
         assert(t1Path.getFileSystem(spark.sessionState.newHadoopConf()).exists(t1Path))
         checkAnswer(spark.table("t1"), Row(2) :: Nil)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/274973d2/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index fce0550..23aea24 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -1681,7 +1681,7 @@ class HiveDDLSuite
                """.stripMargin)
 
             val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
-            assert(table.location == new URI(dir.getAbsolutePath))
+            assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
 
             checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
         }
@@ -1701,7 +1701,7 @@ class HiveDDLSuite
                """.stripMargin)
 
             val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
-            assert(table.location == new URI(dir.getAbsolutePath))
+            assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
 
             val partDir = new File(dir, "a=3")
             assert(partDir.exists())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org