You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/11/03 01:05:18 UTC

spark git commit: [SPARK-17470][SQL] unify path for data source table and locationUri for hive serde table

Repository: spark
Updated Branches:
  refs/heads/master fd90541c3 -> 3a1bc6f47


[SPARK-17470][SQL] unify path for data source table and locationUri for hive serde table

## What changes were proposed in this pull request?

Due to a limitation of hive metastore(table location must be directory path, not file path), we always store `path` for data source table in storage properties, instead of the `locationUri` field. However, we should not expose this difference to `CatalogTable` level, but just treat it as a hack in `HiveExternalCatalog`, like we store table schema of data source table in table properties.

This PR unifies `path` and `locationUri` outside of `HiveExternalCatalog`, both data source table and hive serde table should use the `locationUri` field.

This PR also unifies the way we handle default table location for managed table. Previously, the default table location of hive serde managed table is set by external catalog, but the one of data source table is set by command. After this PR, we follow the hive way and the default table location is always set by external catalog.

For managed non-file-based tables, we will assign a default table location and create an empty directory for it, the table location will be removed when the table is dropped. This is reasonable as metastore doesn't care about whether a table is file-based or not, and an empty table directory has no harm.
For external non-file-based tables, ideally we can omit the table location, but due to a hive metastore issue, we will assign a random location to it, and remove it right after the table is created. See SPARK-15269 for more details. This is fine as it's well isolated in `HiveExternalCatalog`.

To keep the existing behaviour of the `path` option, in this PR we always add the `locationUri` to storage properties using key `path`, before passing storage properties to `DataSource` as data source options.
## How was this patch tested?

existing tests.

Author: Wenchen Fan <we...@databricks.com>

Closes #15024 from cloud-fan/path.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3a1bc6f4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3a1bc6f4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3a1bc6f4

Branch: refs/heads/master
Commit: 3a1bc6f4780f8384c1211b1335e7394a4a28377e
Parents: fd90541
Author: Wenchen Fan <we...@databricks.com>
Authored: Wed Nov 2 18:05:14 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Wed Nov 2 18:05:14 2016 -0700

----------------------------------------------------------------------
 R/pkg/inst/tests/testthat/test_sparkSQL.R       |   4 +-
 .../sql/catalyst/catalog/InMemoryCatalog.scala  |  40 ++-
 .../org/apache/spark/sql/DataFrameWriter.scala  |   5 +-
 .../spark/sql/execution/SparkSqlParser.scala    |  17 +-
 .../command/createDataSourceTables.scala        |  37 +--
 .../spark/sql/execution/command/ddl.scala       |  23 +-
 .../spark/sql/execution/command/tables.scala    |  50 ++--
 .../sql/execution/datasources/DataSource.scala  | 241 ++++++++++---------
 .../datasources/DataSourceStrategy.scala        |   3 +-
 .../apache/spark/sql/internal/CatalogImpl.scala |   4 +-
 .../spark/sql/execution/command/DDLSuite.scala  |   1 -
 .../spark/sql/sources/PathOptionSuite.scala     | 136 +++++++++++
 .../spark/sql/hive/HiveExternalCatalog.scala    | 227 +++++++++++------
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  16 +-
 .../sql/hive/HiveMetastoreCatalogSuite.scala    |   3 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala    |  28 ++-
 .../spark/sql/hive/MultiDatabaseSuite.scala     |   2 +-
 .../spark/sql/hive/execution/HiveDDLSuite.scala |  14 +-
 .../sql/hive/execution/SQLQuerySuite.scala      |   4 +-
 19 files changed, 520 insertions(+), 335 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3a1bc6f4/R/pkg/inst/tests/testthat/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index d7fe6b3..ee48baa 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -2659,7 +2659,7 @@ test_that("Call DataFrameWriter.save() API in Java without path and check argume
   # It makes sure that we can omit path argument in write.df API and then it calls
   # DataFrameWriter.save() without path.
   expect_error(write.df(df, source = "csv"),
-               "Error in save : illegal argument - 'path' is not specified")
+              "Error in save : illegal argument - Expected exactly one path to be specified")
   expect_error(write.json(df, jsonPath),
               "Error in json : analysis error - path file:.*already exists")
   expect_error(write.text(df, jsonPath),
@@ -2667,7 +2667,7 @@ test_that("Call DataFrameWriter.save() API in Java without path and check argume
   expect_error(write.orc(df, jsonPath),
               "Error in orc : analysis error - path file:.*already exists")
   expect_error(write.parquet(df, jsonPath),
-                            "Error in parquet : analysis error - path file:.*already exists")
+              "Error in parquet : analysis error - path file:.*already exists")
 
   # Arguments checking in R side.
   expect_error(write.df(df, "data.tmp", source = c(1, 2)),

http://git-wip-us.apache.org/repos/asf/spark/blob/3a1bc6f4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index f95c9f8..ea675b7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -196,18 +196,32 @@ class InMemoryCatalog(
         throw new TableAlreadyExistsException(db = db, table = table)
       }
     } else {
-      if (tableDefinition.tableType == CatalogTableType.MANAGED) {
-        val dir = new Path(catalog(db).db.locationUri, table)
+      // Set the default table location if this is a managed table and its location is not
+      // specified.
+      // Ideally we should not create a managed table with location, but Hive serde table can
+      // specify location for managed table. And in [[CreateDataSourceTableAsSelectCommand]] we have
+      // to create the table directory and write out data before we create this table, to avoid
+      // exposing a partial written table.
+      val needDefaultTableLocation =
+        tableDefinition.tableType == CatalogTableType.MANAGED &&
+          tableDefinition.storage.locationUri.isEmpty
+
+      val tableWithLocation = if (needDefaultTableLocation) {
+        val defaultTableLocation = new Path(catalog(db).db.locationUri, table)
         try {
-          val fs = dir.getFileSystem(hadoopConfig)
-          fs.mkdirs(dir)
+          val fs = defaultTableLocation.getFileSystem(hadoopConfig)
+          fs.mkdirs(defaultTableLocation)
         } catch {
           case e: IOException =>
             throw new SparkException(s"Unable to create table $table as failed " +
-              s"to create its directory $dir", e)
+              s"to create its directory $defaultTableLocation", e)
         }
+        tableDefinition.withNewStorage(locationUri = Some(defaultTableLocation.toUri.toString))
+      } else {
+        tableDefinition
       }
-      catalog(db).tables.put(table, new TableDesc(tableDefinition))
+
+      catalog(db).tables.put(table, new TableDesc(tableWithLocation))
     }
   }
 
@@ -218,8 +232,12 @@ class InMemoryCatalog(
       purge: Boolean): Unit = synchronized {
     requireDbExists(db)
     if (tableExists(db, table)) {
-      if (getTable(db, table).tableType == CatalogTableType.MANAGED) {
-        val dir = new Path(catalog(db).db.locationUri, table)
+      val tableMeta = getTable(db, table)
+      if (tableMeta.tableType == CatalogTableType.MANAGED) {
+        assert(tableMeta.storage.locationUri.isDefined,
+          "Managed table should always have table location, as we will assign a default location " +
+            "to it if it doesn't have one.")
+        val dir = new Path(tableMeta.storage.locationUri.get)
         try {
           val fs = dir.getFileSystem(hadoopConfig)
           fs.delete(dir, true)
@@ -244,7 +262,10 @@ class InMemoryCatalog(
     oldDesc.table = oldDesc.table.copy(identifier = TableIdentifier(newName, Some(db)))
 
     if (oldDesc.table.tableType == CatalogTableType.MANAGED) {
-      val oldDir = new Path(catalog(db).db.locationUri, oldName)
+      assert(oldDesc.table.storage.locationUri.isDefined,
+        "Managed table should always have table location, as we will assign a default location " +
+          "to it if it doesn't have one.")
+      val oldDir = new Path(oldDesc.table.storage.locationUri.get)
       val newDir = new Path(catalog(db).db.locationUri, newName)
       try {
         val fs = oldDir.getFileSystem(hadoopConfig)
@@ -254,6 +275,7 @@ class InMemoryCatalog(
           throw new SparkException(s"Unable to rename table $oldName to $newName as failed " +
             s"to rename its directory $oldDir", e)
       }
+      oldDesc.table = oldDesc.table.withNewStorage(locationUri = Some(newDir.toUri.toString))
     }
 
     catalog(db).tables.put(newName, oldDesc)

http://git-wip-us.apache.org/repos/asf/spark/blob/3a1bc6f4/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 700f483..f95362e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -373,7 +373,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
         throw new AnalysisException(s"Table $tableIdent already exists.")
 
       case _ =>
-        val tableType = if (new CaseInsensitiveMap(extraOptions.toMap).contains("path")) {
+        val storage = DataSource.buildStorageFormatFromOptions(extraOptions.toMap)
+        val tableType = if (storage.locationUri.isDefined) {
           CatalogTableType.EXTERNAL
         } else {
           CatalogTableType.MANAGED
@@ -382,7 +383,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
         val tableDesc = CatalogTable(
           identifier = tableIdent,
           tableType = tableType,
-          storage = CatalogStorageFormat.empty.copy(properties = extraOptions.toMap),
+          storage = storage,
           schema = new StructType,
           provider = Some(source),
           partitionColumnNames = partitioningColumns.getOrElse(Nil),

http://git-wip-us.apache.org/repos/asf/spark/blob/3a1bc6f4/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index fe183d0..634ffde 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -343,7 +343,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
 
     // TODO: this may be wrong for non file-based data source like JDBC, which should be external
     // even there is no `path` in options. We should consider allow the EXTERNAL keyword.
-    val tableType = if (new CaseInsensitiveMap(options).contains("path")) {
+    val storage = DataSource.buildStorageFormatFromOptions(options)
+    val tableType = if (storage.locationUri.isDefined) {
       CatalogTableType.EXTERNAL
     } else {
       CatalogTableType.MANAGED
@@ -352,7 +353,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
     val tableDesc = CatalogTable(
       identifier = table,
       tableType = tableType,
-      storage = CatalogStorageFormat.empty.copy(properties = options),
+      storage = storage,
       schema = schema.getOrElse(new StructType),
       provider = Some(provider),
       partitionColumnNames = partitionColumnNames,
@@ -1062,17 +1063,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
         if (conf.convertCTAS && !hasStorageProperties) {
           // At here, both rowStorage.serdeProperties and fileStorage.serdeProperties
           // are empty Maps.
-          val optionsWithPath = if (location.isDefined) {
-            Map("path" -> location.get)
-          } else {
-            Map.empty[String, String]
-          }
-
           val newTableDesc = tableDesc.copy(
-            storage = CatalogStorageFormat.empty.copy(properties = optionsWithPath),
-            provider = Some(conf.defaultDataSourceName)
-          )
-
+            storage = CatalogStorageFormat.empty.copy(locationUri = location),
+            provider = Some(conf.defaultDataSourceName))
           CreateTable(newTableDesc, mode, Some(q))
         } else {
           CreateTable(tableDesc, mode, Some(q))

http://git-wip-us.apache.org/repos/asf/spark/blob/3a1bc6f4/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 2a97431..d4b2827 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -57,13 +57,14 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
 
     // Create the relation to validate the arguments before writing the metadata to the metastore,
     // and infer the table schema and partition if users didn't specify schema in CREATE TABLE.
+    val pathOption = table.storage.locationUri.map("path" -> _)
     val dataSource: BaseRelation =
       DataSource(
         sparkSession = sparkSession,
         userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
         className = table.provider.get,
         bucketSpec = table.bucketSpec,
-        options = table.storage.properties).resolveRelation()
+        options = table.storage.properties ++ pathOption).resolveRelation()
 
     dataSource match {
       case fs: HadoopFsRelation =>
@@ -85,14 +86,7 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
       }
     }
 
-    val optionsWithPath = if (table.tableType == CatalogTableType.MANAGED) {
-      table.storage.properties + ("path" -> sessionState.catalog.defaultTablePath(table.identifier))
-    } else {
-      table.storage.properties
-    }
-
     val newTable = table.copy(
-      storage = table.storage.copy(properties = optionsWithPath),
       schema = dataSource.schema,
       partitionColumnNames = partitionColumnNames,
       // If metastore partition management for file source tables is enabled, we start off with
@@ -140,12 +134,6 @@ case class CreateDataSourceTableAsSelectCommand(
     val tableIdentWithDB = table.identifier.copy(database = Some(db))
     val tableName = tableIdentWithDB.unquotedString
 
-    val optionsWithPath = if (table.tableType == CatalogTableType.MANAGED) {
-      table.storage.properties + ("path" -> sessionState.catalog.defaultTablePath(table.identifier))
-    } else {
-      table.storage.properties
-    }
-
     var createMetastoreTable = false
     var existingSchema = Option.empty[StructType]
     if (sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) {
@@ -162,13 +150,7 @@ case class CreateDataSourceTableAsSelectCommand(
           return Seq.empty[Row]
         case SaveMode.Append =>
           // Check if the specified data source match the data source of the existing table.
-          val dataSource = DataSource(
-            sparkSession = sparkSession,
-            userSpecifiedSchema = Some(query.schema.asNullable),
-            partitionColumns = table.partitionColumnNames,
-            bucketSpec = table.bucketSpec,
-            className = provider,
-            options = optionsWithPath)
+          val existingProvider = DataSource.lookupDataSource(provider)
           // TODO: Check that options from the resolved relation match the relation that we are
           // inserting into (i.e. using the same compression).
 
@@ -178,7 +160,7 @@ case class CreateDataSourceTableAsSelectCommand(
             case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
               // check if the file formats match
               l.relation match {
-                case r: HadoopFsRelation if r.fileFormat.getClass != dataSource.providingClass =>
+                case r: HadoopFsRelation if r.fileFormat.getClass != existingProvider =>
                   throw new AnalysisException(
                     s"The file format of the existing table $tableName is " +
                       s"`${r.fileFormat.getClass.getName}`. It doesn't match the specified " +
@@ -213,13 +195,20 @@ case class CreateDataSourceTableAsSelectCommand(
       case None => data
     }
 
+    val tableLocation = if (table.tableType == CatalogTableType.MANAGED) {
+      Some(sessionState.catalog.defaultTablePath(table.identifier))
+    } else {
+      table.storage.locationUri
+    }
+
     // Create the relation based on the data of df.
+    val pathOption = tableLocation.map("path" -> _)
     val dataSource = DataSource(
       sparkSession,
       className = provider,
       partitionColumns = table.partitionColumnNames,
       bucketSpec = table.bucketSpec,
-      options = optionsWithPath)
+      options = table.storage.properties ++ pathOption)
 
     val result = try {
       dataSource.write(mode, df)
@@ -230,7 +219,7 @@ case class CreateDataSourceTableAsSelectCommand(
     }
     if (createMetastoreTable) {
       val newTable = table.copy(
-        storage = table.storage.copy(properties = optionsWithPath),
+        storage = table.storage.copy(locationUri = tableLocation),
         // We will use the schema of resolved.relation as the schema of the table (instead of
         // the schema of df). It is important since the nullability may be changed by the relation
         // provider (for example, see org.apache.spark.sql.parquet.DefaultSource).

http://git-wip-us.apache.org/repos/asf/spark/blob/3a1bc6f4/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 61e0550..52af915 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -485,14 +485,6 @@ case class AlterTableRecoverPartitionsCommand(
     }
   }
 
-  private def getBasePath(table: CatalogTable): Option[String] = {
-    if (table.provider == Some("hive")) {
-      table.storage.locationUri
-    } else {
-      new CaseInsensitiveMap(table.storage.properties).get("path")
-    }
-  }
-
   override def run(spark: SparkSession): Seq[Row] = {
     val catalog = spark.sessionState.catalog
     val table = catalog.getTableMetadata(tableName)
@@ -503,13 +495,12 @@ case class AlterTableRecoverPartitionsCommand(
         s"Operation not allowed: $cmd only works on partitioned tables: $tableIdentWithDB")
     }
 
-    val tablePath = getBasePath(table)
-    if (tablePath.isEmpty) {
+    if (table.storage.locationUri.isEmpty) {
       throw new AnalysisException(s"Operation not allowed: $cmd only works on table with " +
         s"location provided: $tableIdentWithDB")
     }
 
-    val root = new Path(tablePath.get)
+    val root = new Path(table.storage.locationUri.get)
     logInfo(s"Recover all the partitions in $root")
     val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
 
@@ -688,15 +679,7 @@ case class AlterTableSetLocationCommand(
         catalog.alterPartitions(table.identifier, Seq(newPart))
       case None =>
         // No partition spec is specified, so we set the location for the table itself
-        val newTable =
-          if (DDLUtils.isDatasourceTable(table)) {
-            table.withNewStorage(
-              locationUri = Some(location),
-              properties = table.storage.properties ++ Map("path" -> location))
-          } else {
-            table.withNewStorage(locationUri = Some(location))
-          }
-        catalog.alterTable(newTable)
+        catalog.alterTable(table.withNewStorage(locationUri = Some(location)))
     }
     Seq.empty[Row]
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3a1bc6f4/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 4acfffb..f32c956 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
 import org.apache.spark.sql.execution.datasources.PartitioningUtils
+import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
@@ -62,25 +63,6 @@ case class CreateTableLikeCommand(
     val catalog = sparkSession.sessionState.catalog
     val sourceTableDesc = catalog.getTempViewOrPermanentTableMetadata(sourceTable)
 
-    // Storage format
-    val newStorage =
-      if (sourceTableDesc.tableType == CatalogTableType.VIEW) {
-        val newPath = catalog.defaultTablePath(targetTable)
-        CatalogStorageFormat.empty.copy(properties = Map("path" -> newPath))
-      } else if (DDLUtils.isDatasourceTable(sourceTableDesc)) {
-        val newPath = catalog.defaultTablePath(targetTable)
-        val newSerdeProp =
-          sourceTableDesc.storage.properties.filterKeys(_.toLowerCase != "path") ++
-            Map("path" -> newPath)
-        sourceTableDesc.storage.copy(
-          locationUri = None,
-          properties = newSerdeProp)
-      } else {
-        sourceTableDesc.storage.copy(
-          locationUri = None,
-          properties = sourceTableDesc.storage.properties)
-      }
-
     val newProvider = if (sourceTableDesc.tableType == CatalogTableType.VIEW) {
       Some(sparkSession.sessionState.conf.defaultDataSourceName)
     } else {
@@ -91,7 +73,8 @@ case class CreateTableLikeCommand(
       CatalogTable(
         identifier = targetTable,
         tableType = CatalogTableType.MANAGED,
-        storage = newStorage,
+        // We are creating a new managed table, which should not have custom table location.
+        storage = sourceTableDesc.storage.copy(locationUri = None),
         schema = sourceTableDesc.schema,
         provider = newProvider,
         partitionColumnNames = sourceTableDesc.partitionColumnNames,
@@ -170,13 +153,6 @@ case class AlterTableRenameCommand(
           case NonFatal(e) => log.warn(e.toString, e)
         }
       }
-      // For datasource tables, we also need to update the "path" serde property
-      if (DDLUtils.isDatasourceTable(table) && table.tableType == CatalogTableType.MANAGED) {
-        val newPath = catalog.defaultTablePath(newName)
-        val newTable = table.withNewStorage(
-          properties = table.storage.properties ++ Map("path" -> newPath))
-        catalog.alterTable(newTable)
-      }
       // Invalidate the table last, otherwise uncaching the table would load the logical plan
       // back into the hive metastore cache
       catalog.refreshTable(oldName)
@@ -367,8 +343,9 @@ case class TruncateTableCommand(
       DDLUtils.verifyPartitionProviderIsHive(spark, table, "TRUNCATE TABLE ... PARTITION")
     }
     val locations =
-      if (DDLUtils.isDatasourceTable(table)) {
-        Seq(table.storage.properties.get("path"))
+      // TODO: The `InMemoryCatalog` doesn't support listPartition with partial partition spec.
+      if (spark.conf.get(CATALOG_IMPLEMENTATION) == "in-memory") {
+        Seq(table.storage.locationUri)
       } else if (table.partitionColumnNames.isEmpty) {
         Seq(table.storage.locationUri)
       } else {
@@ -916,17 +893,18 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
   }
 
   private def showDataSourceTableOptions(metadata: CatalogTable, builder: StringBuilder): Unit = {
-    val props = metadata.properties
-
     builder ++= s"USING ${metadata.provider.get}\n"
 
-    val dataSourceOptions = metadata.storage.properties.filterNot {
-      case (key, value) =>
+    val dataSourceOptions = metadata.storage.properties.map {
+      case (key, value) => s"${quoteIdentifier(key)} '${escapeSingleQuotedString(value)}'"
+    } ++ metadata.storage.locationUri.flatMap { location =>
+      if (metadata.tableType == MANAGED) {
         // If it's a managed table, omit PATH option. Spark SQL always creates external table
         // when the table creation DDL contains the PATH option.
-        key.toLowerCase == "path" && metadata.tableType == MANAGED
-    }.map {
-      case (key, value) => s"${quoteIdentifier(key)} '${escapeSingleQuotedString(value)}'"
+        None
+      } else {
+        Some(s"path '${escapeSingleQuotedString(location)}'")
+      }
     }
 
     if (dataSourceOptions.nonEmpty) {

http://git-wip-us.apache.org/repos/asf/spark/blob/3a1bc6f4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 3f956c4..0b50448 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable}
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
 import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
@@ -78,115 +78,9 @@ case class DataSource(
 
   case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String])
 
-  lazy val providingClass: Class[_] = lookupDataSource(className)
+  lazy val providingClass: Class[_] = DataSource.lookupDataSource(className)
   lazy val sourceInfo = sourceSchema()
 
-  /** A map to maintain backward compatibility in case we move data sources around. */
-  private val backwardCompatibilityMap: Map[String, String] = {
-    val jdbc = classOf[JdbcRelationProvider].getCanonicalName
-    val json = classOf[JsonFileFormat].getCanonicalName
-    val parquet = classOf[ParquetFileFormat].getCanonicalName
-    val csv = classOf[CSVFileFormat].getCanonicalName
-    val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat"
-    val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat"
-
-    Map(
-      "org.apache.spark.sql.jdbc" -> jdbc,
-      "org.apache.spark.sql.jdbc.DefaultSource" -> jdbc,
-      "org.apache.spark.sql.execution.datasources.jdbc.DefaultSource" -> jdbc,
-      "org.apache.spark.sql.execution.datasources.jdbc" -> jdbc,
-      "org.apache.spark.sql.json" -> json,
-      "org.apache.spark.sql.json.DefaultSource" -> json,
-      "org.apache.spark.sql.execution.datasources.json" -> json,
-      "org.apache.spark.sql.execution.datasources.json.DefaultSource" -> json,
-      "org.apache.spark.sql.parquet" -> parquet,
-      "org.apache.spark.sql.parquet.DefaultSource" -> parquet,
-      "org.apache.spark.sql.execution.datasources.parquet" -> parquet,
-      "org.apache.spark.sql.execution.datasources.parquet.DefaultSource" -> parquet,
-      "org.apache.spark.sql.hive.orc.DefaultSource" -> orc,
-      "org.apache.spark.sql.hive.orc" -> orc,
-      "org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm,
-      "org.apache.spark.ml.source.libsvm" -> libsvm,
-      "com.databricks.spark.csv" -> csv
-    )
-  }
-
-  /**
-   * Class that were removed in Spark 2.0. Used to detect incompatibility libraries for Spark 2.0.
-   */
-  private val spark2RemovedClasses = Set(
-    "org.apache.spark.sql.DataFrame",
-    "org.apache.spark.sql.sources.HadoopFsRelationProvider",
-    "org.apache.spark.Logging")
-
-  /** Given a provider name, look up the data source class definition. */
-  private def lookupDataSource(provider0: String): Class[_] = {
-    val provider = backwardCompatibilityMap.getOrElse(provider0, provider0)
-    val provider2 = s"$provider.DefaultSource"
-    val loader = Utils.getContextOrSparkClassLoader
-    val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
-
-    try {
-      serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider)).toList match {
-        // the provider format did not match any given registered aliases
-        case Nil =>
-          try {
-            Try(loader.loadClass(provider)).orElse(Try(loader.loadClass(provider2))) match {
-              case Success(dataSource) =>
-                // Found the data source using fully qualified path
-                dataSource
-              case Failure(error) =>
-                if (provider.toLowerCase == "orc" ||
-                  provider.startsWith("org.apache.spark.sql.hive.orc")) {
-                  throw new AnalysisException(
-                    "The ORC data source must be used with Hive support enabled")
-                } else if (provider.toLowerCase == "avro" ||
-                  provider == "com.databricks.spark.avro") {
-                  throw new AnalysisException(
-                    s"Failed to find data source: ${provider.toLowerCase}. Please find an Avro " +
-                      "package at " +
-                      "https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects")
-                } else {
-                  throw new ClassNotFoundException(
-                    s"Failed to find data source: $provider. Please find packages at " +
-                      "https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects",
-                    error)
-                }
-            }
-          } catch {
-            case e: NoClassDefFoundError => // This one won't be caught by Scala NonFatal
-              // NoClassDefFoundError's class name uses "/" rather than "." for packages
-              val className = e.getMessage.replaceAll("/", ".")
-              if (spark2RemovedClasses.contains(className)) {
-                throw new ClassNotFoundException(s"$className was removed in Spark 2.0. " +
-                  "Please check if your library is compatible with Spark 2.0", e)
-              } else {
-                throw e
-              }
-          }
-        case head :: Nil =>
-          // there is exactly one registered alias
-          head.getClass
-        case sources =>
-          // There are multiple registered aliases for the input
-          sys.error(s"Multiple sources found for $provider " +
-            s"(${sources.map(_.getClass.getName).mkString(", ")}), " +
-            "please specify the fully qualified class name.")
-      }
-    } catch {
-      case e: ServiceConfigurationError if e.getCause.isInstanceOf[NoClassDefFoundError] =>
-        // NoClassDefFoundError's class name uses "/" rather than "." for packages
-        val className = e.getCause.getMessage.replaceAll("/", ".")
-        if (spark2RemovedClasses.contains(className)) {
-          throw new ClassNotFoundException(s"Detected an incompatible DataSourceRegister. " +
-            "Please remove the incompatible library from classpath or upgrade it. " +
-            s"Error: ${e.getMessage}", e)
-        } else {
-          throw e
-        }
-    }
-  }
-
   /**
    * Infer the schema of the given FileFormat, returns a pair of schema and partition column names.
    */
@@ -470,13 +364,14 @@ case class DataSource(
         //  1. Only one output path can be specified on the write path;
         //  2. Output path must be a legal HDFS style file system path;
         //  3. It's OK that the output path doesn't exist yet;
-        val caseInsensitiveOptions = new CaseInsensitiveMap(options)
-        val outputPath = {
-          val path = new Path(caseInsensitiveOptions.getOrElse("path", {
-            throw new IllegalArgumentException("'path' is not specified")
-          }))
+        val allPaths = paths ++ new CaseInsensitiveMap(options).get("path")
+        val outputPath = if (allPaths.length == 1) {
+          val path = new Path(allPaths.head)
           val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
           path.makeQualified(fs.getUri, fs.getWorkingDirectory)
+        } else {
+          throw new IllegalArgumentException("Expected exactly one path to be specified, but " +
+            s"got: ${allPaths.mkString(", ")}")
         }
 
         val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
@@ -539,3 +434,123 @@ case class DataSource(
     }
   }
 }
+
+object DataSource {
+
+  /** A map to maintain backward compatibility in case we move data sources around. */
+  private val backwardCompatibilityMap: Map[String, String] = {
+    val jdbc = classOf[JdbcRelationProvider].getCanonicalName
+    val json = classOf[JsonFileFormat].getCanonicalName
+    val parquet = classOf[ParquetFileFormat].getCanonicalName
+    val csv = classOf[CSVFileFormat].getCanonicalName
+    val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat"
+    val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat"
+
+    Map(
+      "org.apache.spark.sql.jdbc" -> jdbc,
+      "org.apache.spark.sql.jdbc.DefaultSource" -> jdbc,
+      "org.apache.spark.sql.execution.datasources.jdbc.DefaultSource" -> jdbc,
+      "org.apache.spark.sql.execution.datasources.jdbc" -> jdbc,
+      "org.apache.spark.sql.json" -> json,
+      "org.apache.spark.sql.json.DefaultSource" -> json,
+      "org.apache.spark.sql.execution.datasources.json" -> json,
+      "org.apache.spark.sql.execution.datasources.json.DefaultSource" -> json,
+      "org.apache.spark.sql.parquet" -> parquet,
+      "org.apache.spark.sql.parquet.DefaultSource" -> parquet,
+      "org.apache.spark.sql.execution.datasources.parquet" -> parquet,
+      "org.apache.spark.sql.execution.datasources.parquet.DefaultSource" -> parquet,
+      "org.apache.spark.sql.hive.orc.DefaultSource" -> orc,
+      "org.apache.spark.sql.hive.orc" -> orc,
+      "org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm,
+      "org.apache.spark.ml.source.libsvm" -> libsvm,
+      "com.databricks.spark.csv" -> csv
+    )
+  }
+
+  /**
+   * Class that were removed in Spark 2.0. Used to detect incompatibility libraries for Spark 2.0.
+   */
+  private val spark2RemovedClasses = Set(
+    "org.apache.spark.sql.DataFrame",
+    "org.apache.spark.sql.sources.HadoopFsRelationProvider",
+    "org.apache.spark.Logging")
+
+  /** Given a provider name, look up the data source class definition. */
+  def lookupDataSource(provider: String): Class[_] = {
+    val provider1 = backwardCompatibilityMap.getOrElse(provider, provider)
+    val provider2 = s"$provider1.DefaultSource"
+    val loader = Utils.getContextOrSparkClassLoader
+    val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
+
+    try {
+      serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider1)).toList match {
+        // the provider format did not match any given registered aliases
+        case Nil =>
+          try {
+            Try(loader.loadClass(provider1)).orElse(Try(loader.loadClass(provider2))) match {
+              case Success(dataSource) =>
+                // Found the data source using fully qualified path
+                dataSource
+              case Failure(error) =>
+                if (provider1.toLowerCase == "orc" ||
+                  provider1.startsWith("org.apache.spark.sql.hive.orc")) {
+                  throw new AnalysisException(
+                    "The ORC data source must be used with Hive support enabled")
+                } else if (provider1.toLowerCase == "avro" ||
+                  provider1 == "com.databricks.spark.avro") {
+                  throw new AnalysisException(
+                    s"Failed to find data source: ${provider1.toLowerCase}. Please find an Avro " +
+                      "package at " +
+                      "https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects")
+                } else {
+                  throw new ClassNotFoundException(
+                    s"Failed to find data source: $provider1. Please find packages at " +
+                      "https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects",
+                    error)
+                }
+            }
+          } catch {
+            case e: NoClassDefFoundError => // This one won't be caught by Scala NonFatal
+              // NoClassDefFoundError's class name uses "/" rather than "." for packages
+              val className = e.getMessage.replaceAll("/", ".")
+              if (spark2RemovedClasses.contains(className)) {
+                throw new ClassNotFoundException(s"$className was removed in Spark 2.0. " +
+                  "Please check if your library is compatible with Spark 2.0", e)
+              } else {
+                throw e
+              }
+          }
+        case head :: Nil =>
+          // there is exactly one registered alias
+          head.getClass
+        case sources =>
+          // There are multiple registered aliases for the input
+          sys.error(s"Multiple sources found for $provider1 " +
+            s"(${sources.map(_.getClass.getName).mkString(", ")}), " +
+            "please specify the fully qualified class name.")
+      }
+    } catch {
+      case e: ServiceConfigurationError if e.getCause.isInstanceOf[NoClassDefFoundError] =>
+        // NoClassDefFoundError's class name uses "/" rather than "." for packages
+        val className = e.getCause.getMessage.replaceAll("/", ".")
+        if (spark2RemovedClasses.contains(className)) {
+          throw new ClassNotFoundException(s"Detected an incompatible DataSourceRegister. " +
+            "Please remove the incompatible library from classpath or upgrade it. " +
+            s"Error: ${e.getMessage}", e)
+        } else {
+          throw e
+        }
+    }
+  }
+
+  /**
+   * When creating a data source table, the `path` option has a special meaning: the table location.
+   * This method extracts the `path` option and treat it as table location to build a
+   * [[CatalogStorageFormat]]. Note that, the `path` option is removed from options after this.
+   */
+  def buildStorageFormatFromOptions(options: Map[String, String]): CatalogStorageFormat = {
+    val path = new CaseInsensitiveMap(options).get("path")
+    val optionsWithoutPath = options.filterKeys(_.toLowerCase != "path")
+    CatalogStorageFormat.empty.copy(locationUri = path, properties = optionsWithoutPath)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3a1bc6f4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 47c1f9d..e87998f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -237,6 +237,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
       sparkSession: SparkSession,
       simpleCatalogRelation: SimpleCatalogRelation): LogicalPlan = {
     val table = simpleCatalogRelation.catalogTable
+    val pathOption = table.storage.locationUri.map("path" -> _)
     val dataSource =
       DataSource(
         sparkSession,
@@ -244,7 +245,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
         partitionColumns = table.partitionColumnNames,
         bucketSpec = table.bucketSpec,
         className = table.provider.get,
-        options = table.storage.properties)
+        options = table.storage.properties ++ pathOption)
 
     LogicalRelation(
       dataSource.resolveRelation(),

http://git-wip-us.apache.org/repos/asf/spark/blob/3a1bc6f4/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 44fd38d..d3e323c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, FunctionIdenti
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
-import org.apache.spark.sql.execution.datasources.CreateTable
+import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource}
 import org.apache.spark.sql.types.StructType
 
 
@@ -354,7 +354,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
     val tableDesc = CatalogTable(
       identifier = tableIdent,
       tableType = CatalogTableType.EXTERNAL,
-      storage = CatalogStorageFormat.empty.copy(properties = options),
+      storage = DataSource.buildStorageFormatFromOptions(options),
       schema = schema,
       provider = Some(source)
     )

http://git-wip-us.apache.org/repos/asf/spark/blob/3a1bc6f4/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 9fb0f53..bde3c8a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -1145,7 +1145,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
           assert(storageFormat.properties.isEmpty)
           assert(storageFormat.locationUri === Some(expected))
         } else {
-          assert(storageFormat.properties.get("path") === Some(expected))
           assert(storageFormat.locationUri === Some(expected))
         }
       } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/3a1bc6f4/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
new file mode 100644
index 0000000..bef47aa
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
@@ -0,0 +1,136 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.sources
+
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{IntegerType, Metadata, MetadataBuilder, StructType}
+
+class TestOptionsSource extends SchemaRelationProvider with CreatableRelationProvider {
+
+  // This is used in the read path.
+  override def createRelation(
+      sqlContext: SQLContext,
+      parameters: Map[String, String],
+      schema: StructType): BaseRelation = {
+    new TestOptionsRelation(parameters)(sqlContext.sparkSession)
+  }
+
+  // This is used in the write path.
+  override def createRelation(
+      sqlContext: SQLContext,
+      mode: SaveMode,
+      parameters: Map[String, String],
+      data: DataFrame): BaseRelation = {
+    new TestOptionsRelation(parameters)(sqlContext.sparkSession)
+  }
+}
+
+class TestOptionsRelation(val options: Map[String, String])(@transient val session: SparkSession)
+  extends BaseRelation {
+
+  override def sqlContext: SQLContext = session.sqlContext
+
+  def pathOption: Option[String] = options.get("path")
+
+  // We can't get the relation directly for write path, here we put the path option in schema
+  // metadata, so that we can test it later.
+  override def schema: StructType = {
+    val metadataWithPath = pathOption.map {
+      path => new MetadataBuilder().putString("path", path).build()
+    }
+    new StructType().add("i", IntegerType, true, metadataWithPath.getOrElse(Metadata.empty))
+  }
+}
+
+class PathOptionSuite extends DataSourceTest with SharedSQLContext {
+
+  test("path option always exist") {
+    withTable("src") {
+      sql(
+        s"""
+           |CREATE TABLE src(i int)
+           |USING ${classOf[TestOptionsSource].getCanonicalName}
+           |OPTIONS (PATH '/tmp/path')
+        """.stripMargin)
+      assert(getPathOption("src") == Some("/tmp/path"))
+    }
+
+    // should exist even path option is not specified when creating table
+    withTable("src") {
+      sql(s"CREATE TABLE src(i int) USING ${classOf[TestOptionsSource].getCanonicalName}")
+      assert(getPathOption("src") == Some(defaultTablePath("src")))
+    }
+  }
+
+  test("path option also exist for write path") {
+    withTable("src") {
+      withTempPath { path =>
+        sql(
+          s"""
+            |CREATE TABLE src
+            |USING ${classOf[TestOptionsSource].getCanonicalName}
+            |OPTIONS (PATH '${path.getAbsolutePath}')
+            |AS SELECT 1
+          """.stripMargin)
+        assert(spark.table("src").schema.head.metadata.getString("path") == path.getAbsolutePath)
+      }
+    }
+
+    // should exist even path option is not specified when creating table
+    withTable("src") {
+      sql(
+        s"""
+           |CREATE TABLE src
+           |USING ${classOf[TestOptionsSource].getCanonicalName}
+           |AS SELECT 1
+          """.stripMargin)
+      assert(spark.table("src").schema.head.metadata.getString("path") == defaultTablePath("src"))
+    }
+  }
+
+  test("path option always represent the value of table location") {
+    withTable("src") {
+      sql(
+        s"""
+           |CREATE TABLE src(i int)
+           |USING ${classOf[TestOptionsSource].getCanonicalName}
+           |OPTIONS (PATH '/tmp/path')""".stripMargin)
+      sql("ALTER TABLE src SET LOCATION '/tmp/path2'")
+      assert(getPathOption("src") == Some("/tmp/path2"))
+    }
+
+    withTable("src", "src2") {
+      sql(s"CREATE TABLE src(i int) USING ${classOf[TestOptionsSource].getCanonicalName}")
+      sql("ALTER TABLE src RENAME TO src2")
+      assert(getPathOption("src2") == Some(defaultTablePath("src2")))
+    }
+  }
+
+  private def getPathOption(tableName: String): Option[String] = {
+    spark.table(tableName).queryExecution.analyzed.collect {
+      case LogicalRelation(r: TestOptionsRelation, _, _) => r.pathOption
+    }.head
+  }
+
+  private def defaultTablePath(tableName: String): String = {
+    spark.sessionState.catalog.defaultTablePath(TableIdentifier(tableName))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3a1bc6f4/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 409c316..ebba203 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -29,7 +29,7 @@ import org.apache.thrift.TException
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions._
@@ -38,9 +38,8 @@ import org.apache.spark.sql.execution.command.{ColumnStatStruct, DDLUtils}
 import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
 import org.apache.spark.sql.hive.client.HiveClient
 import org.apache.spark.sql.internal.HiveSerDe
-import org.apache.spark.sql.internal.SQLConf._
 import org.apache.spark.sql.internal.StaticSQLConf._
-import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.sql.types.{DataType, StructType}
 
 
 /**
@@ -189,66 +188,39 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       throw new TableAlreadyExistsException(db = db, table = table)
     }
     // Before saving data source table metadata into Hive metastore, we should:
-    //  1. Put table provider, schema, partition column names, bucket specification and partition
-    //     provider in table properties.
+    //  1. Put table metadata like provider, schema, etc. in table properties.
     //  2. Check if this table is hive compatible
-    //    2.1  If it's not hive compatible, set schema, partition columns and bucket spec to empty
-    //         and save table metadata to Hive.
+    //    2.1  If it's not hive compatible, set location URI, schema, partition columns and bucket
+    //         spec to empty and save table metadata to Hive.
     //    2.2  If it's hive compatible, set serde information in table metadata and try to save
     //         it to Hive. If it fails, treat it as not hive compatible and go back to 2.1
     if (DDLUtils.isDatasourceTable(tableDefinition)) {
-      // data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`.
-      val provider = tableDefinition.provider.get
-      val partitionColumns = tableDefinition.partitionColumnNames
-      val bucketSpec = tableDefinition.bucketSpec
-
-      val tableProperties = new scala.collection.mutable.HashMap[String, String]
-      tableProperties.put(DATASOURCE_PROVIDER, provider)
-      if (tableDefinition.partitionProviderIsHive) {
-        tableProperties.put(TABLE_PARTITION_PROVIDER, "hive")
-      }
-
-      // Serialized JSON schema string may be too long to be stored into a single metastore table
-      // property. In this case, we split the JSON string and store each part as a separate table
-      // property.
-      val threshold = conf.get(SCHEMA_STRING_LENGTH_THRESHOLD)
-      val schemaJsonString = tableDefinition.schema.json
-      // Split the JSON string.
-      val parts = schemaJsonString.grouped(threshold).toSeq
-      tableProperties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
-      parts.zipWithIndex.foreach { case (part, index) =>
-        tableProperties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part)
-      }
-
-      if (partitionColumns.nonEmpty) {
-        tableProperties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString)
-        partitionColumns.zipWithIndex.foreach { case (partCol, index) =>
-          tableProperties.put(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index", partCol)
-        }
-      }
-
-      if (bucketSpec.isDefined) {
-        val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get
+      val tableProperties = tableMetaToTableProps(tableDefinition)
 
-        tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETS, numBuckets.toString)
-        tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETCOLS, bucketColumnNames.length.toString)
-        bucketColumnNames.zipWithIndex.foreach { case (bucketCol, index) =>
-          tableProperties.put(s"$DATASOURCE_SCHEMA_BUCKETCOL_PREFIX$index", bucketCol)
-        }
-
-        if (sortColumnNames.nonEmpty) {
-          tableProperties.put(DATASOURCE_SCHEMA_NUMSORTCOLS, sortColumnNames.length.toString)
-          sortColumnNames.zipWithIndex.foreach { case (sortCol, index) =>
-            tableProperties.put(s"$DATASOURCE_SCHEMA_SORTCOL_PREFIX$index", sortCol)
-          }
-        }
+      val needDefaultTableLocation = tableDefinition.tableType == MANAGED &&
+        tableDefinition.storage.locationUri.isEmpty
+      val tableLocation = if (needDefaultTableLocation) {
+        Some(defaultTablePath(tableDefinition.identifier))
+      } else {
+        tableDefinition.storage.locationUri
       }
+      // Ideally we should also put `locationUri` in table properties like provider, schema, etc.
+      // However, in older version of Spark we already store table location in storage properties
+      // with key "path". Here we keep this behaviour for backward compatibility.
+      val storagePropsWithLocation = tableDefinition.storage.properties ++
+        tableLocation.map("path" -> _)
 
       // converts the table metadata to Spark SQL specific format, i.e. set data schema, names and
       // bucket specification to empty. Note that partition columns are retained, so that we can
       // call partition-related Hive API later.
       def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
         tableDefinition.copy(
+          // Hive only allows directory paths as location URIs while Spark SQL data source tables
+          // also allow file paths. For non-hive-compatible format, we should not set location URI
+          // to avoid hive metastore to throw exception.
+          storage = tableDefinition.storage.copy(
+            locationUri = None,
+            properties = storagePropsWithLocation),
           schema = tableDefinition.partitionSchema,
           bucketSpec = None,
           properties = tableDefinition.properties ++ tableProperties)
@@ -259,10 +231,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
         val location = if (tableDefinition.tableType == EXTERNAL) {
           // When we hit this branch, we are saving an external data source table with hive
           // compatible format, which means the data source is file-based and must have a `path`.
-          val map = new CaseInsensitiveMap(tableDefinition.storage.properties)
-          require(map.contains("path"),
+          require(tableDefinition.storage.locationUri.isDefined,
             "External file-based data source table must have a `path` entry in storage properties.")
-          Some(new Path(map("path")).toUri.toString)
+          Some(new Path(tableDefinition.storage.locationUri.get).toUri.toString)
         } else {
           None
         }
@@ -272,7 +243,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
             locationUri = location,
             inputFormat = serde.inputFormat,
             outputFormat = serde.outputFormat,
-            serde = serde.serde
+            serde = serde.serde,
+            properties = storagePropsWithLocation
           ),
           properties = tableDefinition.properties ++ tableProperties)
       }
@@ -337,6 +309,68 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     }
   }
 
+  /**
+   * Data source tables may be non Hive compatible and we need to store table metadata in table
+   * properties to workaround some Hive metastore limitations.
+   * This method puts table provider, partition provider, schema, partition column names, bucket
+   * specification into a map, which can be used as table properties later.
+   */
+  private def tableMetaToTableProps(table: CatalogTable): scala.collection.Map[String, String] = {
+    // data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`.
+    val provider = table.provider.get
+    val partitionColumns = table.partitionColumnNames
+    val bucketSpec = table.bucketSpec
+
+    val properties = new scala.collection.mutable.HashMap[String, String]
+    properties.put(DATASOURCE_PROVIDER, provider)
+    if (table.partitionProviderIsHive) {
+      properties.put(TABLE_PARTITION_PROVIDER, "hive")
+    }
+
+    // Serialized JSON schema string may be too long to be stored into a single metastore table
+    // property. In this case, we split the JSON string and store each part as a separate table
+    // property.
+    val threshold = conf.get(SCHEMA_STRING_LENGTH_THRESHOLD)
+    val schemaJsonString = table.schema.json
+    // Split the JSON string.
+    val parts = schemaJsonString.grouped(threshold).toSeq
+    properties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
+    parts.zipWithIndex.foreach { case (part, index) =>
+      properties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part)
+    }
+
+    if (partitionColumns.nonEmpty) {
+      properties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString)
+      partitionColumns.zipWithIndex.foreach { case (partCol, index) =>
+        properties.put(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index", partCol)
+      }
+    }
+
+    if (bucketSpec.isDefined) {
+      val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get
+
+      properties.put(DATASOURCE_SCHEMA_NUMBUCKETS, numBuckets.toString)
+      properties.put(DATASOURCE_SCHEMA_NUMBUCKETCOLS, bucketColumnNames.length.toString)
+      bucketColumnNames.zipWithIndex.foreach { case (bucketCol, index) =>
+        properties.put(s"$DATASOURCE_SCHEMA_BUCKETCOL_PREFIX$index", bucketCol)
+      }
+
+      if (sortColumnNames.nonEmpty) {
+        properties.put(DATASOURCE_SCHEMA_NUMSORTCOLS, sortColumnNames.length.toString)
+        sortColumnNames.zipWithIndex.foreach { case (sortCol, index) =>
+          properties.put(s"$DATASOURCE_SCHEMA_SORTCOL_PREFIX$index", sortCol)
+        }
+      }
+    }
+
+    properties
+  }
+
+  private def defaultTablePath(tableIdent: TableIdentifier): String = {
+    val dbLocation = getDatabase(tableIdent.database.get).locationUri
+    new Path(new Path(dbLocation), tableIdent.table).toString
+  }
+
   private def saveTableIntoHive(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
     assert(DDLUtils.isDatasourceTable(tableDefinition),
       "saveTableIntoHive only takes data source table.")
@@ -383,11 +417,35 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
   }
 
   override def renameTable(db: String, oldName: String, newName: String): Unit = withClient {
-    val newTable = client.getTable(db, oldName)
-      .copy(identifier = TableIdentifier(newName, Some(db)))
+    val rawTable = client.getTable(db, oldName)
+
+    val storageWithNewPath = if (rawTable.tableType == MANAGED) {
+      // If it's a managed table and we are renaming it, then the path option becomes inaccurate
+      // and we need to update it according to the new table name.
+      val newTablePath = defaultTablePath(TableIdentifier(newName, Some(db)))
+      updateLocationInStorageProps(rawTable, Some(newTablePath))
+    } else {
+      rawTable.storage
+    }
+
+    val newTable = rawTable.copy(
+      identifier = TableIdentifier(newName, Some(db)),
+      storage = storageWithNewPath)
+
     client.alterTable(oldName, newTable)
   }
 
+  private def getLocationFromStorageProps(table: CatalogTable): Option[String] = {
+    new CaseInsensitiveMap(table.storage.properties).get("path")
+  }
+
+  private def updateLocationInStorageProps(
+      table: CatalogTable,
+      newPath: Option[String]): CatalogStorageFormat = {
+    val propsWithoutPath = table.storage.properties.filterKeys(_.toLowerCase != "path")
+    table.storage.copy(properties = propsWithoutPath ++ newPath.map("path" -> _))
+  }
+
   /**
    * Alter a table whose name that matches the one specified in `tableDefinition`,
    * assuming the table exists.
@@ -418,21 +476,36 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     }
 
     if (DDLUtils.isDatasourceTable(withStatsProps)) {
-      val oldDef = client.getTable(db, withStatsProps.identifier.table)
-      // Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition,
-      // to retain the spark specific format if it is. Also add old data source properties to table
-      // properties, to retain the data source table format.
-      val oldDataSourceProps = oldDef.properties.filter(_._1.startsWith(SPARK_SQL_PREFIX))
+      val oldTableDef = client.getTable(db, withStatsProps.identifier.table)
+
+      val oldLocation = getLocationFromStorageProps(oldTableDef)
+      val newLocation = tableDefinition.storage.locationUri
+      // Only update the `locationUri` field if the location is really changed, because this table
+      // may be not Hive-compatible and can not set the `locationUri` field. We should respect the
+      // old `locationUri` even it's None.
+      val storageWithNewLocation = if (oldLocation == newLocation) {
+        oldTableDef.storage
+      } else {
+        updateLocationInStorageProps(oldTableDef, newLocation).copy(locationUri = newLocation)
+      }
+
       val partitionProviderProp = if (tableDefinition.partitionProviderIsHive) {
         TABLE_PARTITION_PROVIDER -> "hive"
       } else {
         TABLE_PARTITION_PROVIDER -> "builtin"
       }
+
+      // Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition,
+      // to retain the spark specific format if it is. Also add old data source properties to table
+      // properties, to retain the data source table format.
+      val oldDataSourceProps = oldTableDef.properties.filter(_._1.startsWith(SPARK_SQL_PREFIX))
+      val newTableProps = oldDataSourceProps ++ withStatsProps.properties + partitionProviderProp
       val newDef = withStatsProps.copy(
-        schema = oldDef.schema,
-        partitionColumnNames = oldDef.partitionColumnNames,
-        bucketSpec = oldDef.bucketSpec,
-        properties = oldDataSourceProps ++ withStatsProps.properties + partitionProviderProp)
+        storage = storageWithNewLocation,
+        schema = oldTableDef.schema,
+        partitionColumnNames = oldTableDef.partitionColumnNames,
+        bucketSpec = oldTableDef.bucketSpec,
+        properties = newTableProps)
 
       client.alterTable(newDef)
     } else {
@@ -465,22 +538,16 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     } else {
       getProviderFromTableProperties(table).map { provider =>
         assert(provider != "hive", "Hive serde table should not save provider in table properties.")
-        // SPARK-15269: Persisted data source tables always store the location URI as a storage
-        // property named "path" instead of standard Hive `dataLocation`, because Hive only
-        // allows directory paths as location URIs while Spark SQL data source tables also
-        // allows file paths. So the standard Hive `dataLocation` is meaningless for Spark SQL
-        // data source tables.
-        // Spark SQL may also save external data source in Hive compatible format when
-        // possible, so that these tables can be directly accessed by Hive. For these tables,
-        // `dataLocation` is still necessary. Here we also check for input format because only
-        // these Hive compatible tables set this field.
-        val storage = if (table.tableType == EXTERNAL && table.storage.inputFormat.isEmpty) {
-          table.storage.copy(locationUri = None)
-        } else {
-          table.storage
+        // Internally we store the table location in storage properties with key "path" for data
+        // source tables. Here we set the table location to `locationUri` field and filter out the
+        // path option in storage properties, to avoid exposing this concept externally.
+        val storageWithLocation = {
+          val tableLocation = getLocationFromStorageProps(table)
+          updateLocationInStorageProps(table, None).copy(locationUri = tableLocation)
         }
+
         table.copy(
-          storage = storage,
+          storage = storageWithLocation,
           schema = getSchemaFromTableProperties(table),
           provider = Some(provider),
           partitionColumnNames = getPartitionColumnsFromTableProperties(table),

http://git-wip-us.apache.org/repos/asf/spark/blob/3a1bc6f4/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 624ab74..8e5fc88 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -17,16 +17,13 @@
 
 package org.apache.spark.sql.hive
 
-import scala.collection.JavaConverters._
-
 import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
-import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.execution.command.DDLUtils
@@ -56,12 +53,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
       tableIdent.table.toLowerCase)
   }
 
-  private def getQualifiedTableName(t: CatalogTable): QualifiedTableName = {
-    QualifiedTableName(
-      t.identifier.database.getOrElse(getCurrentDatabase).toLowerCase,
-      t.identifier.table.toLowerCase)
-  }
-
   /** A cache of Spark SQL data source tables that have been accessed. */
   protected[hive] val cachedDataSourceTables: LoadingCache[QualifiedTableName, LogicalPlan] = {
     val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() {
@@ -69,6 +60,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
         logDebug(s"Creating new cached data source for $in")
         val table = sparkSession.sharedState.externalCatalog.getTable(in.database, in.name)
 
+        val pathOption = table.storage.locationUri.map("path" -> _)
         val dataSource =
           DataSource(
             sparkSession,
@@ -76,7 +68,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
             partitionColumns = table.partitionColumnNames,
             bucketSpec = table.bucketSpec,
             className = table.provider.get,
-            options = table.storage.properties,
+            options = table.storage.properties ++ pathOption,
             catalogTable = Some(table))
 
         LogicalRelation(dataSource.resolveRelation(), catalogTable = Some(table))

http://git-wip-us.apache.org/repos/asf/spark/blob/3a1bc6f4/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index 0477ea4..7abc4d9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -142,8 +142,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
           assert(hiveTable.storage.serde === Some(serde))
 
           assert(hiveTable.tableType === CatalogTableType.EXTERNAL)
-          assert(hiveTable.storage.locationUri ===
-            Some(path.toURI.toString.stripSuffix(File.separator)))
+          assert(hiveTable.storage.locationUri === Some(path.toString))
 
           val columns = hiveTable.schema
           assert(columns.map(_.name) === Seq("d1", "d2"))

http://git-wip-us.apache.org/repos/asf/spark/blob/3a1bc6f4/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index eaa67d3..c50f92e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -998,7 +998,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
         identifier = TableIdentifier("not_skip_hive_metadata"),
         tableType = CatalogTableType.EXTERNAL,
         storage = CatalogStorageFormat.empty.copy(
-          properties = Map("path" -> tempPath.getCanonicalPath, "skipHiveMetadata" -> "false")
+          locationUri = Some(tempPath.getCanonicalPath),
+          properties = Map("skipHiveMetadata" -> "false")
         ),
         schema = schema,
         provider = Some("parquet")
@@ -1282,9 +1283,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
         sql("insert into t values (2, 3, 4)")
         checkAnswer(table("t"), Seq(Row(1, 2, 3), Row(2, 3, 4)))
         val catalogTable = hiveClient.getTable("default", "t")
-        // there should not be a lowercase key 'path' now
-        assert(catalogTable.storage.properties.get("path").isEmpty)
-        assert(catalogTable.storage.properties.get("PATH").isDefined)
+        assert(catalogTable.storage.locationUri.isDefined)
       }
     }
   }
@@ -1351,4 +1350,25 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
       sparkSession.sparkContext.conf.set(DEBUG_MODE, previousValue)
     }
   }
+
+  test("SPARK-17470: support old table that stores table location in storage properties") {
+    withTable("old") {
+      withTempPath { path =>
+        Seq(1 -> "a").toDF("i", "j").write.parquet(path.getAbsolutePath)
+        val tableDesc = CatalogTable(
+          identifier = TableIdentifier("old", Some("default")),
+          tableType = CatalogTableType.EXTERNAL,
+          storage = CatalogStorageFormat.empty.copy(
+            properties = Map("path" -> path.getAbsolutePath)
+          ),
+          schema = new StructType(),
+          properties = Map(
+            HiveExternalCatalog.DATASOURCE_PROVIDER -> "parquet",
+            HiveExternalCatalog.DATASOURCE_SCHEMA ->
+              new StructType().add("i", "int").add("j", "string").json))
+        hiveClient.createTable(tableDesc, ignoreIfExists = false)
+        checkAnswer(spark.table("old"), Row(1, "a"))
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3a1bc6f4/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
index 7ba880e..cfc1d81 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
@@ -29,7 +29,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle
     val expectedPath =
       spark.sharedState.externalCatalog.getDatabase(dbName).locationUri + "/" + tableName
 
-    assert(metastoreTable.storage.properties("path") === expectedPath)
+    assert(metastoreTable.storage.locationUri.get === expectedPath)
   }
 
   private def getTableNames(dbName: Option[String] = None): Array[String] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/3a1bc6f4/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index e9268a9..682d7d4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -859,14 +859,6 @@ class HiveDDLSuite
     }
   }
 
-  private def getTablePath(table: CatalogTable): Option[String] = {
-    if (DDLUtils.isDatasourceTable(table)) {
-      new CaseInsensitiveMap(table.storage.properties).get("path")
-    } else {
-      table.storage.locationUri
-    }
-  }
-
   private def checkCreateTableLike(sourceTable: CatalogTable, targetTable: CatalogTable): Unit = {
     // The created table should be a MANAGED table with empty view text and original text.
     assert(targetTable.tableType == CatalogTableType.MANAGED,
@@ -915,10 +907,8 @@ class HiveDDLSuite
       assert(targetTable.provider == sourceTable.provider)
     }
 
-    val sourceTablePath = getTablePath(sourceTable)
-    val targetTablePath = getTablePath(targetTable)
-    assert(targetTablePath.nonEmpty, "target table path should not be empty")
-    assert(sourceTablePath != targetTablePath,
+    assert(targetTable.storage.locationUri.nonEmpty, "target table path should not be empty")
+    assert(sourceTable.storage.locationUri != targetTable.storage.locationUri,
       "source table/view path should be different from target table path")
 
     // The source table contents should not been seen in the target table.

http://git-wip-us.apache.org/repos/asf/spark/blob/3a1bc6f4/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index b9353b5..3a597d6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -517,7 +517,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     val catalogTable =
       sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
     relation match {
-      case LogicalRelation(r: HadoopFsRelation, _, _) =>
+      case LogicalRelation(r: HadoopFsRelation, _, Some(table)) =>
         if (!isDataSourceTable) {
           fail(
             s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " +
@@ -525,7 +525,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
         }
         userSpecifiedLocation match {
           case Some(location) =>
-            assert(r.options("path") === location)
+            assert(table.storage.locationUri.get === location)
           case None => // OK.
         }
         assert(catalogTable.provider.get === format)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org