You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/03/09 09:18:21 UTC
spark git commit: [SPARK-19763][SQL] qualified external datasource
table location stored in catalog
Repository: spark
Updated Branches:
refs/heads/master eeb1d6db8 -> 274973d2a
[SPARK-19763][SQL] qualified external datasource table location stored in catalog
## What changes were proposed in this pull request?
If we create a external datasource table with a non-qualified location , we should qualified it to store in catalog.
```
CREATE TABLE t(a string)
USING parquet
LOCATION '/path/xx'
CREATE TABLE t1(a string, b string)
USING parquet
PARTITIONED BY(b)
LOCATION '/path/xx'
```
when we get the table from catalog, the location should be qualified, e.g.'file:/path/xxx'
## How was this patch tested?
unit test added
Author: windpiger <so...@outlook.com>
Closes #17095 from windpiger/tablepathQualified.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/274973d2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/274973d2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/274973d2
Branch: refs/heads/master
Commit: 274973d2a32ff4eb28545b50a3135e4784eb3047
Parents: eeb1d6d
Author: windpiger <so...@outlook.com>
Authored: Thu Mar 9 01:18:17 2017 -0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Mar 9 01:18:17 2017 -0800
----------------------------------------------------------------------
.../sql/catalyst/catalog/SessionCatalog.scala | 14 +++++-
.../spark/sql/execution/command/DDLSuite.scala | 50 ++++++++++++++++----
.../spark/sql/internal/CatalogSuite.scala | 3 +-
.../spark/sql/sources/PathOptionSuite.scala | 9 ++--
.../apache/spark/sql/test/SQLTestUtils.scala | 5 --
.../sql/hive/HiveMetastoreCatalogSuite.scala | 4 +-
.../spark/sql/hive/client/VersionsSuite.scala | 6 +--
.../spark/sql/hive/execution/HiveDDLSuite.scala | 4 +-
8 files changed, 64 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/274973d2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 6cfc4a4..bfcdb70 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -259,7 +259,19 @@ class SessionCatalog(
val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableDefinition.identifier.table)
validateName(table)
- val newTableDefinition = tableDefinition.copy(identifier = TableIdentifier(table, Some(db)))
+
+ val newTableDefinition = if (tableDefinition.storage.locationUri.isDefined
+ && !tableDefinition.storage.locationUri.get.isAbsolute) {
+ // make the location of the table qualified.
+ val qualifiedTableLocation =
+ makeQualifiedPath(tableDefinition.storage.locationUri.get)
+ tableDefinition.copy(
+ storage = tableDefinition.storage.copy(locationUri = Some(qualifiedTableLocation)),
+ identifier = TableIdentifier(table, Some(db)))
+ } else {
+ tableDefinition.copy(identifier = TableIdentifier(table, Some(db)))
+ }
+
requireDbExists(db)
externalCatalog.createTable(newTableDefinition, ignoreIfExists)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/274973d2/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index aa335c4..5f70a8c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -230,8 +230,8 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
private def getDBPath(dbName: String): URI = {
- val warehousePath = s"file:${spark.sessionState.conf.warehousePath.stripPrefix("file:")}"
- new Path(warehousePath, s"$dbName.db").toUri
+ val warehousePath = makeQualifiedPath(s"${spark.sessionState.conf.warehousePath}")
+ new Path(CatalogUtils.URIToString(warehousePath), s"$dbName.db").toUri
}
test("the qualified path of a database is stored in the catalog") {
@@ -1360,7 +1360,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
val partitionLocation = if (isUsingHiveMetastore) {
val tableLocation = catalog.getTableMetadata(tableIdent).storage.locationUri
assert(tableLocation.isDefined)
- makeQualifiedPath(new Path(tableLocation.get.toString, "paris"))
+ makeQualifiedPath(new Path(tableLocation.get.toString, "paris").toString)
} else {
new URI("paris")
}
@@ -1909,7 +1909,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
|OPTIONS(path "$dir")
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
- assert(table.location == new URI(dir.getAbsolutePath))
+ assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
dir.delete
assert(!dir.exists)
@@ -1950,7 +1950,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
|LOCATION "$dir"
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
- assert(table.location == new URI(dir.getAbsolutePath))
+ assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)
@@ -1976,7 +1976,8 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
|OPTIONS(path "$dir")
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
- assert(table.location == new URI(dir.getAbsolutePath))
+
+ assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
dir.delete()
checkAnswer(spark.table("t"), Nil)
@@ -2032,7 +2033,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
- assert(table.location == new URI(dir.getAbsolutePath))
+ assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
}
@@ -2051,7 +2052,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
- assert(table.location == new URI(dir.getAbsolutePath))
+ assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
val partDir = new File(dir, "a=3")
assert(partDir.exists())
@@ -2099,7 +2100,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
- assert(table.location == new Path(loc.getAbsolutePath).toUri)
+ assert(table.location == makeQualifiedPath(loc.getAbsolutePath))
assert(new Path(table.location).toString.contains(specialChars))
assert(loc.listFiles().isEmpty)
@@ -2120,7 +2121,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
- assert(table.location == new Path(loc.getAbsolutePath).toUri)
+ assert(table.location == makeQualifiedPath(loc.getAbsolutePath))
assert(new Path(table.location).toString.contains(specialChars))
assert(loc.listFiles().isEmpty)
@@ -2162,4 +2163,33 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
}
}
+
+ test("the qualified path of a datasource table is stored in the catalog") {
+ withTable("t", "t1") {
+ withTempDir { dir =>
+ assert(!dir.getAbsolutePath.startsWith("file:/"))
+ spark.sql(
+ s"""
+ |CREATE TABLE t(a string)
+ |USING parquet
+ |LOCATION '$dir'
+ """.stripMargin)
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+ assert(table.location.toString.startsWith("file:/"))
+ }
+
+ withTempDir { dir =>
+ assert(!dir.getAbsolutePath.startsWith("file:/"))
+ spark.sql(
+ s"""
+ |CREATE TABLE t1(a string, b string)
+ |USING parquet
+ |PARTITIONED BY(b)
+ |LOCATION '$dir'
+ """.stripMargin)
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
+ assert(table.location.toString.startsWith("file:/"))
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/274973d2/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index fcb8ffb..9742b3b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -18,7 +18,6 @@
package org.apache.spark.sql.internal
import java.io.File
-import java.net.URI
import org.scalatest.BeforeAndAfterEach
@@ -459,7 +458,7 @@ class CatalogSuite
options = Map("path" -> dir.getAbsolutePath))
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table.tableType == CatalogTableType.EXTERNAL)
- assert(table.storage.locationUri.get == new URI(dir.getAbsolutePath))
+ assert(table.storage.locationUri.get == makeQualifiedPath(dir.getAbsolutePath))
Seq((1)).toDF("i").write.insertInto("t")
assert(dir.exists() && dir.listFiles().nonEmpty)
http://git-wip-us.apache.org/repos/asf/spark/blob/274973d2/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
index 7ab339e..60adee4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
@@ -75,7 +75,7 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext {
|USING ${classOf[TestOptionsSource].getCanonicalName}
|OPTIONS (PATH '/tmp/path')
""".stripMargin)
- assert(getPathOption("src") == Some("/tmp/path"))
+ assert(getPathOption("src") == Some("file:/tmp/path"))
}
// should exist even path option is not specified when creating table
@@ -88,15 +88,16 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext {
test("path option also exist for write path") {
withTable("src") {
withTempPath { p =>
- val path = new Path(p.getAbsolutePath).toString
sql(
s"""
|CREATE TABLE src
|USING ${classOf[TestOptionsSource].getCanonicalName}
- |OPTIONS (PATH '$path')
+ |OPTIONS (PATH '$p')
|AS SELECT 1
""".stripMargin)
- assert(spark.table("src").schema.head.metadata.getString("path") == path)
+ assert(CatalogUtils.stringToURI(
+ spark.table("src").schema.head.metadata.getString("path")) ==
+ makeQualifiedPath(p.getAbsolutePath))
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/274973d2/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index 12fc899..9201954 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -306,11 +306,6 @@ private[sql] trait SQLTestUtils
val fs = hadoopPath.getFileSystem(spark.sessionState.newHadoopConf())
fs.makeQualified(hadoopPath).toUri
}
-
- def makeQualifiedPath(path: Path): URI = {
- val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
- fs.makeQualified(path).toUri
- }
}
private[sql] object SQLTestUtils {
http://git-wip-us.apache.org/repos/asf/spark/blob/274973d2/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index cf552b4..079358b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.hive
-import java.net.URI
-
import org.apache.spark.sql.{QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
@@ -142,7 +140,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
assert(hiveTable.storage.serde === Some(serde))
assert(hiveTable.tableType === CatalogTableType.EXTERNAL)
- assert(hiveTable.storage.locationUri === Some(new URI(path.getAbsolutePath)))
+ assert(hiveTable.storage.locationUri === Some(makeQualifiedPath(dir.getAbsolutePath)))
val columns = hiveTable.schema
assert(columns.map(_.name) === Seq("d1", "d2"))
http://git-wip-us.apache.org/repos/asf/spark/blob/274973d2/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index dd624ec..6025f8a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -658,19 +658,17 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w
val tPath = new Path(spark.sessionState.conf.warehousePath, "t")
Seq("1").toDF("a").write.saveAsTable("t")
- val expectedPath = s"file:${tPath.toUri.getPath.stripSuffix("/")}"
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
- assert(table.location == CatalogUtils.stringToURI(expectedPath))
+ assert(table.location == makeQualifiedPath(tPath.toString))
assert(tPath.getFileSystem(spark.sessionState.newHadoopConf()).exists(tPath))
checkAnswer(spark.table("t"), Row("1") :: Nil)
val t1Path = new Path(spark.sessionState.conf.warehousePath, "t1")
spark.sql("create table t1 using parquet as select 2 as a")
val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
- val expectedPath1 = s"file:${t1Path.toUri.getPath.stripSuffix("/")}"
- assert(table1.location == CatalogUtils.stringToURI(expectedPath1))
+ assert(table1.location == makeQualifiedPath(t1Path.toString))
assert(t1Path.getFileSystem(spark.sessionState.newHadoopConf()).exists(t1Path))
checkAnswer(spark.table("t1"), Row(2) :: Nil)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/274973d2/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index fce0550..23aea24 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -1681,7 +1681,7 @@ class HiveDDLSuite
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
- assert(table.location == new URI(dir.getAbsolutePath))
+ assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
}
@@ -1701,7 +1701,7 @@ class HiveDDLSuite
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
- assert(table.location == new URI(dir.getAbsolutePath))
+ assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
val partDir = new File(dir, "a=3")
assert(partDir.exists())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org