You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/11/05 07:59:01 UTC

spark git commit: [SPARK-17183][SPARK-17983][SPARK-18101][SQL] put hive serde table schema to table properties like data source table

Repository: spark
Updated Branches:
  refs/heads/master 6e2701815 -> 95ec4e25b


[SPARK-17183][SPARK-17983][SPARK-18101][SQL] put hive serde table schema to table properties like data source table

## What changes were proposed in this pull request?

For data source tables, we will put its table schema, partition columns, etc. to table properties, to work around some hive metastore issues, e.g. not case-preserving, bad decimal type support, etc.

We should also do this for hive serde tables, to reduce the difference between hive serde tables and data source tables, e.g. column names should be case preserving.
## How was this patch tested?

existing tests, and a new test in `HiveExternalCatalog`

Author: Wenchen Fan <we...@databricks.com>

Closes #14750 from cloud-fan/minor1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/95ec4e25
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/95ec4e25
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/95ec4e25

Branch: refs/heads/master
Commit: 95ec4e25bb65f37f80222ffe70a95993a9149f80
Parents: 6e27018
Author: Wenchen Fan <we...@databricks.com>
Authored: Sat Nov 5 00:58:50 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Sat Nov 5 00:58:50 2016 -0700

----------------------------------------------------------------------
 .../sql/catalyst/catalog/ExternalCatalog.scala  |   8 +-
 .../sql/catalyst/catalog/InMemoryCatalog.scala  |   6 -
 .../org/apache/spark/sql/types/DataType.scala   |  24 ++
 .../catalyst/catalog/ExternalCatalogSuite.scala |  20 ++
 .../org/apache/spark/sql/DataFrameWriter.scala  |  10 +-
 .../spark/sql/execution/SparkSqlParser.scala    |   4 +-
 .../spark/sql/execution/SparkStrategies.scala   |   6 +-
 .../spark/sql/execution/command/ddl.scala       |   4 +-
 .../spark/sql/execution/datasources/rules.scala |   5 +-
 .../spark/sql/hive/HiveExternalCatalog.scala    | 218 ++++++++++++++-----
 .../input1-2-d3aa54d5436b7b59ff5c7091b7ca6145   |   4 +-
 .../input2-1-e0efeda558cd0194f4764a5735147b16   |   4 +-
 .../input2-2-aa9ab0598e0cb7a12c719f9b3d98dbfd   |   4 +-
 .../input2-4-235f92683416fab031e6e7490487b15b   |   6 +-
 ...w_columns-2-b74990316ec4245fd8a7011e684b39da |   6 +-
 .../hive/PartitionedTablePerfStatsSuite.scala   |   9 +-
 .../sql/hive/execution/SQLQuerySuite.scala      |   4 +-
 17 files changed, 245 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/95ec4e25/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
index a5e0252..14dd707 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.catalog
 
-import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException}
+import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException, NoSuchTableException}
 import org.apache.spark.sql.catalyst.expressions.Expression
 
 
@@ -39,6 +39,12 @@ abstract class ExternalCatalog {
     }
   }
 
+  protected def requireTableExists(db: String, table: String): Unit = {
+    if (!tableExists(db, table)) {
+      throw new NoSuchTableException(db = db, table = table)
+    }
+  }
+
   protected def requireFunctionExists(db: String, funcName: String): Unit = {
     if (!functionExists(db, funcName)) {
       throw new NoSuchFunctionException(db = db, func = funcName)

http://git-wip-us.apache.org/repos/asf/spark/blob/95ec4e25/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index ea675b7..bc39688 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -64,12 +64,6 @@ class InMemoryCatalog(
     catalog(db).tables(table).partitions.contains(spec)
   }
 
-  private def requireTableExists(db: String, table: String): Unit = {
-    if (!tableExists(db, table)) {
-      throw new NoSuchTableException(db = db, table = table)
-    }
-  }
-
   private def requireTableNotExists(db: String, table: String): Unit = {
     if (tableExists(db, table)) {
       throw new TableAlreadyExistsException(db = db, table = table)

http://git-wip-us.apache.org/repos/asf/spark/blob/95ec4e25/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
index 312585d..2642d93 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
@@ -250,4 +250,28 @@ object DataType {
       case (fromDataType, toDataType) => fromDataType == toDataType
     }
   }
+
+  /**
+   * Compares two types, ignoring nullability of ArrayType, MapType, StructType, and ignoring case
+   * sensitivity of field names in StructType.
+   */
+  private[sql] def equalsIgnoreCaseAndNullability(from: DataType, to: DataType): Boolean = {
+    (from, to) match {
+      case (ArrayType(fromElement, _), ArrayType(toElement, _)) =>
+        equalsIgnoreCaseAndNullability(fromElement, toElement)
+
+      case (MapType(fromKey, fromValue, _), MapType(toKey, toValue, _)) =>
+        equalsIgnoreCaseAndNullability(fromKey, toKey) &&
+          equalsIgnoreCaseAndNullability(fromValue, toValue)
+
+      case (StructType(fromFields), StructType(toFields)) =>
+        fromFields.length == toFields.length &&
+          fromFields.zip(toFields).forall { case (l, r) =>
+            l.name.equalsIgnoreCase(r.name) &&
+              equalsIgnoreCaseAndNullability(l.dataType, r.dataType)
+          }
+
+      case (fromDataType, toDataType) => fromDataType == toDataType
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/95ec4e25/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index f283f42..66f92d1 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -270,6 +270,26 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
     assert(catalog.listTables("db2", "*1").toSet == Set("tbl1"))
   }
 
+  test("column names should be case-preserving and column nullability should be retained") {
+    val catalog = newBasicCatalog()
+    val tbl = CatalogTable(
+      identifier = TableIdentifier("tbl", Some("db1")),
+      tableType = CatalogTableType.MANAGED,
+      storage = storageFormat,
+      schema = new StructType()
+        .add("HelLo", "int", nullable = false)
+        .add("WoRLd", "int", nullable = true),
+      provider = Some("hive"),
+      partitionColumnNames = Seq("WoRLd"),
+      bucketSpec = Some(BucketSpec(4, Seq("HelLo"), Nil)))
+    catalog.createTable(tbl, ignoreIfExists = false)
+
+    val readBack = catalog.getTable("db1", "tbl")
+    assert(readBack.schema == tbl.schema)
+    assert(readBack.partitionColumnNames == tbl.partitionColumnNames)
+    assert(readBack.bucketSpec == tbl.bucketSpec)
+  }
+
   // --------------------------------------------------------------------------
   // Partitions
   // --------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/spark/blob/95ec4e25/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index f95362e..e0c8981 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -24,10 +24,10 @@ import scala.collection.JavaConverters._
 import org.apache.spark.annotation.InterfaceStability
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType}
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, OverwriteOptions, Union}
-import org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand
-import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, CreateTable, DataSource, HadoopFsRelation}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, OverwriteOptions}
+import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, DDLUtils}
+import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, HadoopFsRelation}
 import org.apache.spark.sql.types.StructType
 
 /**
@@ -359,7 +359,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
   }
 
   private def saveAsTable(tableIdent: TableIdentifier): Unit = {
-    if (source.toLowerCase == "hive") {
+    if (source.toLowerCase == DDLUtils.HIVE_PROVIDER) {
       throw new AnalysisException("Cannot create hive serde table with saveAsTable API")
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/95ec4e25/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 634ffde..b8be3d1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -331,7 +331,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
     }
     val options = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
     val provider = ctx.tableProvider.qualifiedName.getText
-    if (provider.toLowerCase == "hive") {
+    if (provider.toLowerCase == DDLUtils.HIVE_PROVIDER) {
       throw new AnalysisException("Cannot create hive serde table with CREATE TABLE USING")
     }
     val schema = Option(ctx.colTypeList()).map(createSchema)
@@ -1034,7 +1034,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
       tableType = tableType,
       storage = storage,
       schema = schema,
-      provider = Some("hive"),
+      provider = Some(DDLUtils.HIVE_PROVIDER),
       partitionColumnNames = partitionCols.map(_.name),
       properties = properties,
       comment = comment)

http://git-wip-us.apache.org/repos/asf/spark/blob/95ec4e25/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 5412aca..190fdd8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -415,7 +415,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
 
   object DDLStrategy extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case CreateTable(tableDesc, mode, None) if tableDesc.provider.get == "hive" =>
+      case CreateTable(tableDesc, mode, None)
+        if tableDesc.provider.get == DDLUtils.HIVE_PROVIDER =>
         val cmd = CreateTableCommand(tableDesc, ifNotExists = mode == SaveMode.Ignore)
         ExecutedCommandExec(cmd) :: Nil
 
@@ -427,7 +428,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
       // CREATE TABLE ... AS SELECT ... for hive serde table is handled in hive module, by rule
       // `CreateTables`
 
-      case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get != "hive" =>
+      case CreateTable(tableDesc, mode, Some(query))
+        if tableDesc.provider.get != DDLUtils.HIVE_PROVIDER =>
         val cmd =
           CreateDataSourceTableAsSelectCommand(
             tableDesc,

http://git-wip-us.apache.org/repos/asf/spark/blob/95ec4e25/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index b4d3ca1..8500ab4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -687,8 +687,10 @@ case class AlterTableSetLocationCommand(
 
 
 object DDLUtils {
+  val HIVE_PROVIDER = "hive"
+
   def isDatasourceTable(table: CatalogTable): Boolean = {
-    table.provider.isDefined && table.provider.get != "hive"
+    table.provider.isDefined && table.provider.get != HIVE_PROVIDER
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/95ec4e25/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 4647b11..5ba44ff 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, RowOrd
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation}
 import org.apache.spark.sql.types.{AtomicType, StructType}
@@ -127,7 +128,7 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
     checkDuplication(normalizedPartitionCols, "partition")
 
     if (schema.nonEmpty && normalizedPartitionCols.length == schema.length) {
-      if (tableDesc.provider.get == "hive") {
+      if (tableDesc.provider.get == DDLUtils.HIVE_PROVIDER) {
         // When we hit this branch, it means users didn't specify schema for the table to be
         // created, as we always include partition columns in table schema for hive serde tables.
         // The real schema will be inferred at hive metastore by hive serde, plus the given
@@ -292,7 +293,7 @@ object HiveOnlyCheck extends (LogicalPlan => Unit) {
   def apply(plan: LogicalPlan): Unit = {
     plan.foreach {
       case CreateTable(tableDesc, _, Some(_))
-          if tableDesc.provider.get == "hive" =>
+          if tableDesc.provider.get == DDLUtils.HIVE_PROVIDER =>
         throw new AnalysisException("Hive support is required to use CREATE Hive TABLE AS SELECT")
 
       case _ => // OK

http://git-wip-us.apache.org/repos/asf/spark/blob/95ec4e25/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 64ba526..b537061 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -95,8 +95,14 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     }
   }
 
-  private def requireTableExists(db: String, table: String): Unit = {
-    withClient { getTable(db, table) }
+  /**
+   * Get the raw table metadata from hive metastore directly. The raw table metadata may contains
+   * special data source properties and should not be exposed outside of `HiveExternalCatalog`. We
+   * should interpret these special data source properties and restore the original table metadata
+   * before returning it.
+   */
+  private def getRawTable(db: String, table: String): CatalogTable = withClient {
+    client.getTable(db, table)
   }
 
   /**
@@ -187,16 +193,32 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     if (tableExists(db, table) && !ignoreIfExists) {
       throw new TableAlreadyExistsException(db = db, table = table)
     }
-    // Before saving data source table metadata into Hive metastore, we should:
-    //  1. Put table metadata like provider, schema, etc. in table properties.
-    //  2. Check if this table is hive compatible
-    //    2.1  If it's not hive compatible, set location URI, schema, partition columns and bucket
-    //         spec to empty and save table metadata to Hive.
-    //    2.2  If it's hive compatible, set serde information in table metadata and try to save
-    //         it to Hive. If it fails, treat it as not hive compatible and go back to 2.1
-    if (DDLUtils.isDatasourceTable(tableDefinition)) {
+
+    if (tableDefinition.tableType == VIEW) {
+      client.createTable(tableDefinition, ignoreIfExists)
+    } else if (tableDefinition.provider.get == DDLUtils.HIVE_PROVIDER) {
+      // Here we follow data source tables and put table metadata like provider, schema, etc. in
+      // table properties, so that we can work around the Hive metastore issue about not case
+      // preserving and make Hive serde table support mixed-case column names.
+      val tableWithDataSourceProps = tableDefinition.copy(
+        properties = tableDefinition.properties ++ tableMetaToTableProps(tableDefinition))
+      client.createTable(tableWithDataSourceProps, ignoreIfExists)
+    } else {
+      // To work around some hive metastore issues, e.g. not case-preserving, bad decimal type
+      // support, no column nullability, etc., we should do some extra works before saving table
+      // metadata into Hive metastore:
+      //  1. Put table metadata like provider, schema, etc. in table properties.
+      //  2. Check if this table is hive compatible.
+      //    2.1  If it's not hive compatible, set location URI, schema, partition columns and bucket
+      //         spec to empty and save table metadata to Hive.
+      //    2.2  If it's hive compatible, set serde information in table metadata and try to save
+      //         it to Hive. If it fails, treat it as not hive compatible and go back to 2.1
       val tableProperties = tableMetaToTableProps(tableDefinition)
 
+      // Ideally we should not create a managed table with location, but Hive serde table can
+      // specify location for managed table. And in [[CreateDataSourceTableAsSelectCommand]] we have
+      // to create the table directory and write out data before we create this table, to avoid
+      // exposing a partial written table.
       val needDefaultTableLocation = tableDefinition.tableType == MANAGED &&
         tableDefinition.storage.locationUri.isEmpty
       val tableLocation = if (needDefaultTableLocation) {
@@ -304,8 +326,6 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
           logWarning(message)
           saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists)
       }
-    } else {
-      client.createTable(tableDefinition, ignoreIfExists)
     }
   }
 
@@ -417,11 +437,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
   }
 
   override def renameTable(db: String, oldName: String, newName: String): Unit = withClient {
-    val rawTable = client.getTable(db, oldName)
-
-    val storageWithNewPath = if (rawTable.tableType == MANAGED) {
-      // If it's a managed table and we are renaming it, then the path option becomes inaccurate
-      // and we need to update it according to the new table name.
+    val rawTable = getRawTable(db, oldName)
+
+    // Note that Hive serde tables don't use path option in storage properties to store the value
+    // of table location, but use `locationUri` field to store it directly. And `locationUri` field
+    // will be updated automatically in Hive metastore by the `alterTable` call at the end of this
+    // method. Here we only update the path option if the path option already exists in storage
+    // properties, to avoid adding a unnecessary path option for Hive serde tables.
+    val hasPathOption = new CaseInsensitiveMap(rawTable.storage.properties).contains("path")
+    val storageWithNewPath = if (rawTable.tableType == MANAGED && hasPathOption) {
+      // If it's a managed table with path option and we are renaming it, then the path option
+      // becomes inaccurate and we need to update it according to the new table name.
       val newTablePath = defaultTablePath(TableIdentifier(newName, Some(db)))
       updateLocationInStorageProps(rawTable, Some(newTablePath))
     } else {
@@ -442,7 +468,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
   private def updateLocationInStorageProps(
       table: CatalogTable,
       newPath: Option[String]): CatalogStorageFormat = {
-    val propsWithoutPath = table.storage.properties.filterKeys(_.toLowerCase != "path")
+    // We can't use `filterKeys` here, as the map returned by `filterKeys` is not serializable,
+    // while `CatalogTable` should be serializable.
+    val propsWithoutPath = table.storage.properties.filter {
+      case (k, v) => k.toLowerCase != "path"
+    }
     table.storage.copy(properties = propsWithoutPath ++ newPath.map("path" -> _))
   }
 
@@ -475,18 +505,51 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       tableDefinition
     }
 
-    if (DDLUtils.isDatasourceTable(withStatsProps)) {
-      val oldTableDef = client.getTable(db, withStatsProps.identifier.table)
+    if (tableDefinition.tableType == VIEW) {
+      client.alterTable(withStatsProps)
+    } else {
+      val oldTableDef = getRawTable(db, withStatsProps.identifier.table)
 
-      val oldLocation = getLocationFromStorageProps(oldTableDef)
-      val newLocation = tableDefinition.storage.locationUri
-      // Only update the `locationUri` field if the location is really changed, because this table
-      // may be not Hive-compatible and can not set the `locationUri` field. We should respect the
-      // old `locationUri` even it's None.
-      val storageWithNewLocation = if (oldLocation == newLocation) {
-        oldTableDef.storage
+      val newStorage = if (tableDefinition.provider.get == DDLUtils.HIVE_PROVIDER) {
+        tableDefinition.storage
       } else {
-        updateLocationInStorageProps(oldTableDef, newLocation).copy(locationUri = newLocation)
+        // We can't alter the table storage of data source table directly for 2 reasons:
+        //   1. internally we use path option in storage properties to store the value of table
+        //      location, but the given `tableDefinition` is from outside and doesn't have the path
+        //      option, we need to add it manually.
+        //   2. this data source table may be created on a file, not a directory, then we can't set
+        //      the `locationUri` field and save it to Hive metastore, because Hive only allows
+        //      directory as table location.
+        //
+        // For example, an external data source table is created with a single file '/path/to/file'.
+        // Internally, we will add a path option with value '/path/to/file' to storage properties,
+        // and set the `locationUri` to a special value due to SPARK-15269(please see
+        // `saveTableIntoHive` for more details). When users try to get the table metadata back, we
+        // will restore the `locationUri` field from the path option and remove the path option from
+        // storage properties. When users try to alter the table storage, the given
+        // `tableDefinition` will have `locationUri` field with value `/path/to/file` and the path
+        // option is not set.
+        //
+        // Here we need 2 extra steps:
+        //   1. add path option to storage properties, to match the internal format, i.e. using path
+        //      option to store the value of table location.
+        //   2. set the `locationUri` field back to the old one from the existing table metadata,
+        //      if users don't want to alter the table location. This step is necessary as the
+        //      `locationUri` is not always same with the path option, e.g. in the above example
+        //      `locationUri` is a special value and we should respect it. Note that, if users
+        //       want to alter the table location to a file path, we will fail. This should be fixed
+        //       in the future.
+
+        val newLocation = tableDefinition.storage.locationUri
+        val storageWithPathOption = tableDefinition.storage.copy(
+          properties = tableDefinition.storage.properties ++ newLocation.map("path" -> _))
+
+        val oldLocation = getLocationFromStorageProps(oldTableDef)
+        if (oldLocation == newLocation) {
+          storageWithPathOption.copy(locationUri = oldTableDef.storage.locationUri)
+        } else {
+          storageWithPathOption
+        }
       }
 
       val partitionProviderProp = if (tableDefinition.tracksPartitionsInCatalog) {
@@ -498,23 +561,21 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       // Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition,
       // to retain the spark specific format if it is. Also add old data source properties to table
       // properties, to retain the data source table format.
-      val oldDataSourceProps = oldTableDef.properties.filter(_._1.startsWith(SPARK_SQL_PREFIX))
+      val oldDataSourceProps = oldTableDef.properties.filter(_._1.startsWith(DATASOURCE_PREFIX))
       val newTableProps = oldDataSourceProps ++ withStatsProps.properties + partitionProviderProp
       val newDef = withStatsProps.copy(
-        storage = storageWithNewLocation,
+        storage = newStorage,
         schema = oldTableDef.schema,
         partitionColumnNames = oldTableDef.partitionColumnNames,
         bucketSpec = oldTableDef.bucketSpec,
         properties = newTableProps)
 
       client.alterTable(newDef)
-    } else {
-      client.alterTable(withStatsProps)
     }
   }
 
   override def getTable(db: String, table: String): CatalogTable = withClient {
-    restoreTableMetadata(client.getTable(db, table))
+    restoreTableMetadata(getRawTable(db, table))
   }
 
   override def getTableOption(db: String, table: String): Option[CatalogTable] = withClient {
@@ -536,28 +597,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     val tableWithSchema = if (table.tableType == VIEW) {
       table
     } else {
-      getProviderFromTableProperties(table).map { provider =>
-        assert(provider != TABLE_PARTITION_PROVIDER_CATALOG,
-          "Hive serde table should not save provider in table properties.")
-        // Internally we store the table location in storage properties with key "path" for data
-        // source tables. Here we set the table location to `locationUri` field and filter out the
-        // path option in storage properties, to avoid exposing this concept externally.
-        val storageWithLocation = {
-          val tableLocation = getLocationFromStorageProps(table)
-          updateLocationInStorageProps(table, None).copy(locationUri = tableLocation)
-        }
-        val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER)
-
-        table.copy(
-          storage = storageWithLocation,
-          schema = getSchemaFromTableProperties(table),
-          provider = Some(provider),
-          partitionColumnNames = getPartitionColumnsFromTableProperties(table),
-          bucketSpec = getBucketSpecFromTableProperties(table),
-          tracksPartitionsInCatalog = partitionProvider == Some(TABLE_PARTITION_PROVIDER_CATALOG)
-        )
-      } getOrElse {
-        table.copy(provider = Some("hive"), tracksPartitionsInCatalog = true)
+      getProviderFromTableProperties(table) match {
+        // No provider in table properties, which means this table is created by Spark prior to 2.1,
+        // or is created at Hive side.
+        case None =>
+          table.copy(provider = Some(DDLUtils.HIVE_PROVIDER), tracksPartitionsInCatalog = true)
+
+        // This is a Hive serde table created by Spark 2.1 or higher versions.
+        case Some(DDLUtils.HIVE_PROVIDER) => restoreHiveSerdeTable(table)
+
+        // This is a regular data source table.
+        case Some(provider) => restoreDataSourceTable(table, provider)
       }
     }
 
@@ -583,6 +633,50 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     tableWithStats.copy(properties = getOriginalTableProperties(table))
   }
 
+  private def restoreHiveSerdeTable(table: CatalogTable): CatalogTable = {
+    val hiveTable = table.copy(
+      provider = Some(DDLUtils.HIVE_PROVIDER),
+      tracksPartitionsInCatalog = true)
+
+    val schemaFromTableProps = getSchemaFromTableProperties(table)
+    if (DataType.equalsIgnoreCaseAndNullability(schemaFromTableProps, table.schema)) {
+      hiveTable.copy(
+        schema = schemaFromTableProps,
+        partitionColumnNames = getPartitionColumnsFromTableProperties(table),
+        bucketSpec = getBucketSpecFromTableProperties(table))
+    } else {
+      // Hive metastore may change the table schema, e.g. schema inference. If the table
+      // schema we read back is different(ignore case and nullability) from the one in table
+      // properties which was written when creating table, we should respect the table schema
+      // from hive.
+      logWarning(s"The table schema given by Hive metastore(${table.schema.simpleString}) is " +
+        "different from the schema when this table was created by Spark SQL" +
+        s"(${schemaFromTableProps.simpleString}). We have to fall back to the table schema from " +
+        "Hive metastore which is not case preserving.")
+      hiveTable
+    }
+  }
+
+  private def restoreDataSourceTable(table: CatalogTable, provider: String): CatalogTable = {
+    // Internally we store the table location in storage properties with key "path" for data
+    // source tables. Here we set the table location to `locationUri` field and filter out the
+    // path option in storage properties, to avoid exposing this concept externally.
+    val storageWithLocation = {
+      val tableLocation = getLocationFromStorageProps(table)
+      // We pass None as `newPath` here, to remove the path option in storage properties.
+      updateLocationInStorageProps(table, newPath = None).copy(locationUri = tableLocation)
+    }
+    val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER)
+
+    table.copy(
+      provider = Some(provider),
+      storage = storageWithLocation,
+      schema = getSchemaFromTableProperties(table),
+      partitionColumnNames = getPartitionColumnsFromTableProperties(table),
+      bucketSpec = getBucketSpecFromTableProperties(table),
+      tracksPartitionsInCatalog = partitionProvider == Some(TABLE_PARTITION_PROVIDER_CATALOG))
+  }
+
   override def tableExists(db: String, table: String): Boolean = withClient {
     client.tableExists(db, table)
   }
@@ -623,7 +717,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
 
     val orderedPartitionSpec = new util.LinkedHashMap[String, String]()
     getTable(db, table).partitionColumnNames.foreach { colName =>
-      orderedPartitionSpec.put(colName, partition(colName))
+      // Hive metastore is not case preserving and keeps partition columns with lower cased names,
+      // and Hive will validate the column names in partition spec to make sure they are partition
+      // columns. Here we Lowercase the column names before passing the partition spec to Hive
+      // client, to satisfy Hive.
+      orderedPartitionSpec.put(colName.toLowerCase, partition(colName))
     }
 
     client.loadPartition(
@@ -648,7 +746,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
 
     val orderedPartitionSpec = new util.LinkedHashMap[String, String]()
     getTable(db, table).partitionColumnNames.foreach { colName =>
-      orderedPartitionSpec.put(colName, partition(colName))
+      // Hive metastore is not case preserving and keeps partition columns with lower cased names,
+      // and Hive will validate the column names in partition spec to make sure they are partition
+      // columns. Here we Lowercase the column names before passing the partition spec to Hive
+      // client, to satisfy Hive.
+      orderedPartitionSpec.put(colName.toLowerCase, partition(colName))
     }
 
     client.loadDynamicPartitions(
@@ -754,7 +856,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       db: String,
       table: String,
       predicates: Seq[Expression]): Seq[CatalogTablePartition] = withClient {
-    val rawTable = client.getTable(db, table)
+    val rawTable = getRawTable(db, table)
     val catalogTable = restoreTableMetadata(rawTable)
     val partitionColumnNames = catalogTable.partitionColumnNames.toSet
     val nonPartitionPruningPredicates = predicates.filterNot {

http://git-wip-us.apache.org/repos/asf/spark/blob/95ec4e25/sql/hive/src/test/resources/golden/input1-2-d3aa54d5436b7b59ff5c7091b7ca6145
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/resources/golden/input1-2-d3aa54d5436b7b59ff5c7091b7ca6145 b/sql/hive/src/test/resources/golden/input1-2-d3aa54d5436b7b59ff5c7091b7ca6145
index d3ffb99..93ba96e 100644
--- a/sql/hive/src/test/resources/golden/input1-2-d3aa54d5436b7b59ff5c7091b7ca6145
+++ b/sql/hive/src/test/resources/golden/input1-2-d3aa54d5436b7b59ff5c7091b7ca6145
@@ -1,2 +1,2 @@
-a                   	int                 	                    
-b                   	double              	                    
+A                   	int
+B                   	double

http://git-wip-us.apache.org/repos/asf/spark/blob/95ec4e25/sql/hive/src/test/resources/golden/input2-1-e0efeda558cd0194f4764a5735147b16
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/resources/golden/input2-1-e0efeda558cd0194f4764a5735147b16 b/sql/hive/src/test/resources/golden/input2-1-e0efeda558cd0194f4764a5735147b16
index d3ffb99..93ba96e 100644
--- a/sql/hive/src/test/resources/golden/input2-1-e0efeda558cd0194f4764a5735147b16
+++ b/sql/hive/src/test/resources/golden/input2-1-e0efeda558cd0194f4764a5735147b16
@@ -1,2 +1,2 @@
-a                   	int                 	                    
-b                   	double              	                    
+A                   	int
+B                   	double

http://git-wip-us.apache.org/repos/asf/spark/blob/95ec4e25/sql/hive/src/test/resources/golden/input2-2-aa9ab0598e0cb7a12c719f9b3d98dbfd
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/resources/golden/input2-2-aa9ab0598e0cb7a12c719f9b3d98dbfd b/sql/hive/src/test/resources/golden/input2-2-aa9ab0598e0cb7a12c719f9b3d98dbfd
index d3ffb99..93ba96e 100644
--- a/sql/hive/src/test/resources/golden/input2-2-aa9ab0598e0cb7a12c719f9b3d98dbfd
+++ b/sql/hive/src/test/resources/golden/input2-2-aa9ab0598e0cb7a12c719f9b3d98dbfd
@@ -1,2 +1,2 @@
-a                   	int                 	                    
-b                   	double              	                    
+A                   	int
+B                   	double

http://git-wip-us.apache.org/repos/asf/spark/blob/95ec4e25/sql/hive/src/test/resources/golden/input2-4-235f92683416fab031e6e7490487b15b
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/resources/golden/input2-4-235f92683416fab031e6e7490487b15b b/sql/hive/src/test/resources/golden/input2-4-235f92683416fab031e6e7490487b15b
index 77eaef9..d52fcf0 100644
--- a/sql/hive/src/test/resources/golden/input2-4-235f92683416fab031e6e7490487b15b
+++ b/sql/hive/src/test/resources/golden/input2-4-235f92683416fab031e6e7490487b15b
@@ -1,3 +1,3 @@
-a                   	array<int>          	                    
-b                   	double              	                    
-c                   	map<double,int>     	                    
+A                   	array<int>
+B                   	double
+C                   	map<double,int>

http://git-wip-us.apache.org/repos/asf/spark/blob/95ec4e25/sql/hive/src/test/resources/golden/show_columns-2-b74990316ec4245fd8a7011e684b39da
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/resources/golden/show_columns-2-b74990316ec4245fd8a7011e684b39da b/sql/hive/src/test/resources/golden/show_columns-2-b74990316ec4245fd8a7011e684b39da
index 70c14c3..2f7168c 100644
--- a/sql/hive/src/test/resources/golden/show_columns-2-b74990316ec4245fd8a7011e684b39da
+++ b/sql/hive/src/test/resources/golden/show_columns-2-b74990316ec4245fd8a7011e684b39da
@@ -1,3 +1,3 @@
-key                 
-value               
-ds                  
+KEY
+VALUE
+ds

http://git-wip-us.apache.org/repos/asf/spark/blob/95ec4e25/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
index d8e31c4..b41bc86 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
@@ -105,12 +105,9 @@ class PartitionedTablePerfStatsSuite
         assert(df4.count() == 0)
         assert(df4.inputFiles.length == 0)
 
-        // TODO(ekl) enable for hive tables as well once SPARK-17983 is fixed
-        if (spec.isDatasourceTable) {
-          val df5 = spark.sql("select * from test where fieldOne = 4")
-          assert(df5.count() == 1)
-          assert(df5.inputFiles.length == 5)
-        }
+        val df5 = spark.sql("select * from test where fieldOne = 4")
+        assert(df5.count() == 1)
+        assert(df5.inputFiles.length == 5)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/95ec4e25/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index cc09aef..28e5dff 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -521,7 +521,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     val catalogTable =
       sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
     relation match {
-      case LogicalRelation(r: HadoopFsRelation, _, Some(table)) =>
+      case LogicalRelation(r: HadoopFsRelation, _, _) =>
         if (!isDataSourceTable) {
           fail(
             s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " +
@@ -529,7 +529,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
         }
         userSpecifiedLocation match {
           case Some(location) =>
-            assert(table.storage.locationUri.get === location)
+            assert(r.options("path") === location)
           case None => // OK.
         }
         assert(catalogTable.provider.get === format)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org