You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/08/22 05:23:22 UTC

[2/2] spark git commit: [SPARK-16498][SQL] move hive hack for data source table into HiveExternalCatalog

[SPARK-16498][SQL] move hive hack for data source table into HiveExternalCatalog

## What changes were proposed in this pull request?

Spark SQL doesn't have its own meta store yet, and use hive's currently. However, hive's meta store has some limitations(e.g. columns can't be too many, not case-preserving, bad decimal type support, etc.), so we have some hacks to successfully store data source table metadata into hive meta store, i.e. put all the information in table properties.

This PR moves these hacks to `HiveExternalCatalog`, tries to isolate hive specific logic in one place.

changes overview:

1.  **before this PR**: we need to put metadata(schema, partition columns, etc.) of data source tables to table properties before saving it to external catalog, even the external catalog doesn't use hive metastore(e.g. `InMemoryCatalog`)
**after this PR**: the table properties tricks are only in `HiveExternalCatalog`, the caller side doesn't need to take care of it anymore.

2. **before this PR**: because the table properties tricks are done outside of external catalog, so we also need to revert these tricks when we read the table metadata from external catalog and use it. e.g. in `DescribeTableCommand` we will read schema and partition columns from table properties.
**after this PR**: The table metadata read from external catalog is exactly the same with what we saved to it.

bonus: now we can create data source table using `SessionCatalog`, if schema is specified.
breaks: `schemaStringLengthThreshold` is not configurable anymore. `hive.default.rcfile.serde` is not configurable anymore.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <we...@databricks.com>

Closes #14155 from cloud-fan/catalog-table.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b2074b66
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b2074b66
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b2074b66

Branch: refs/heads/master
Commit: b2074b664a9c269c4103760d40c4a14e7aeb1e83
Parents: 91c2397
Author: Wenchen Fan <we...@databricks.com>
Authored: Sun Aug 21 22:23:14 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Sun Aug 21 22:23:14 2016 -0700

----------------------------------------------------------------------
 .../spark/ml/source/libsvm/LibSVMRelation.scala |   3 +-
 .../spark/sql/execution/SparkSqlParser.scala    |   4 +-
 .../command/createDataSourceTables.scala        | 255 ++------------
 .../spark/sql/execution/command/ddl.scala       |  94 +-----
 .../spark/sql/execution/command/tables.scala    |  59 +---
 .../datasources/DataSourceStrategy.scala        |  22 +-
 .../execution/datasources/WriterContainer.scala |  16 +-
 .../execution/datasources/csv/CSVRelation.scala |   5 +-
 .../datasources/json/JsonFileFormat.scala       |   3 +-
 .../datasources/parquet/ParquetFileFormat.scala |   4 +-
 .../datasources/text/TextFileFormat.scala       |   3 +-
 .../apache/spark/sql/internal/HiveSerDe.scala   |   6 +-
 .../sql/execution/command/DDLCommandSuite.scala |   6 +-
 .../spark/sql/execution/command/DDLSuite.scala  | 110 ++-----
 .../sql/sources/CreateTableAsSelectSuite.scala  |   5 +-
 .../spark/sql/hive/HiveExternalCatalog.scala    | 328 ++++++++++++++++++-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  67 +---
 .../spark/sql/hive/client/HiveClientImpl.scala  |  16 +-
 .../spark/sql/hive/orc/OrcFileFormat.scala      |   3 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala    | 110 ++++---
 .../sql/hive/execution/HiveCommandSuite.scala   |  40 +--
 .../spark/sql/hive/execution/HiveDDLSuite.scala |  23 ++
 .../sql/hive/execution/SQLQuerySuite.scala      |   4 +-
 .../spark/sql/sources/SimpleTextRelation.scala  |   3 +-
 24 files changed, 536 insertions(+), 653 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b2074b66/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
index 034223e..5c79c69 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
@@ -33,7 +33,6 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
-import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
@@ -51,7 +50,7 @@ private[libsvm] class LibSVMOutputWriter(
     new TextOutputFormat[NullWritable, Text]() {
       override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
         val configuration = context.getConfiguration
-        val uniqueWriteJobId = configuration.get(CreateDataSourceTableUtils.DATASOURCE_WRITEJOBUUID)
+        val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
         val taskAttemptId = context.getTaskAttemptID
         val split = taskAttemptId.getTaskID.getId
         new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")

http://git-wip-us.apache.org/repos/asf/spark/blob/b2074b66/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 71c3bd3..e32d301 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -971,7 +971,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
     // Storage format
     val defaultStorage: CatalogStorageFormat = {
       val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile")
-      val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType, conf)
+      val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType)
       CatalogStorageFormat(
         locationUri = None,
         inputFormat = defaultHiveSerde.flatMap(_.inputFormat)
@@ -1115,7 +1115,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   override def visitGenericFileFormat(
       ctx: GenericFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
     val source = ctx.identifier.getText
-    HiveSerDe.sourceToSerDe(source, conf) match {
+    HiveSerDe.sourceToSerDe(source) match {
       case Some(s) =>
         CatalogStorageFormat.empty.copy(
           inputFormat = s.inputFormat,

http://git-wip-us.apache.org/repos/asf/spark/blob/b2074b66/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 7b028e7..7400a0e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -17,10 +17,6 @@
 
 package org.apache.spark.sql.execution.command
 
-import scala.collection.mutable
-import scala.util.control.NonFatal
-
-import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
@@ -28,7 +24,6 @@ import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.internal.HiveSerDe
 import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation}
 import org.apache.spark.sql.types._
 
@@ -97,16 +92,19 @@ case class CreateDataSourceTableCommand(
       }
     }
 
-    CreateDataSourceTableUtils.createDataSourceTable(
-      sparkSession = sparkSession,
-      tableIdent = tableIdent,
+    val table = CatalogTable(
+      identifier = tableIdent,
+      tableType = if (isExternal) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED,
+      storage = CatalogStorageFormat.empty.copy(properties = optionsWithPath),
       schema = dataSource.schema,
-      partitionColumns = partitionColumns,
-      bucketSpec = bucketSpec,
-      provider = provider,
-      options = optionsWithPath,
-      isExternal = isExternal)
-
+      provider = Some(provider),
+      partitionColumnNames = partitionColumns,
+      bucketSpec = bucketSpec
+    )
+
+    // We will return Nil or throw exception at the beginning if the table already exists, so when
+    // we reach here, the table should not exist and we should set `ignoreIfExists` to false.
+    sessionState.catalog.createTable(table, ignoreIfExists = false)
     Seq.empty[Row]
   }
 }
@@ -193,7 +191,7 @@ case class CreateDataSourceTableAsSelectCommand(
               }
               existingSchema = Some(l.schema)
             case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) =>
-              existingSchema = Some(DDLUtils.getSchemaFromTableProperties(s.metadata))
+              existingSchema = Some(s.metadata.schema)
             case o =>
               throw new AnalysisException(s"Saving data in ${o.toString} is not supported.")
           }
@@ -233,15 +231,17 @@ case class CreateDataSourceTableAsSelectCommand(
       // We will use the schema of resolved.relation as the schema of the table (instead of
       // the schema of df). It is important since the nullability may be changed by the relation
       // provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
-      CreateDataSourceTableUtils.createDataSourceTable(
-        sparkSession = sparkSession,
-        tableIdent = tableIdent,
-        schema = result.schema,
-        partitionColumns = partitionColumns,
-        bucketSpec = bucketSpec,
-        provider = provider,
-        options = optionsWithPath,
-        isExternal = isExternal)
+      val schema = result.schema
+      val table = CatalogTable(
+        identifier = tableIdent,
+        tableType = if (isExternal) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED,
+        storage = CatalogStorageFormat.empty.copy(properties = optionsWithPath),
+        schema = schema,
+        provider = Some(provider),
+        partitionColumnNames = partitionColumns,
+        bucketSpec = bucketSpec
+      )
+      sessionState.catalog.createTable(table, ignoreIfExists = false)
     }
 
     // Refresh the cache of the table in the catalog.
@@ -249,210 +249,3 @@ case class CreateDataSourceTableAsSelectCommand(
     Seq.empty[Row]
   }
 }
-
-
-object CreateDataSourceTableUtils extends Logging {
-
-  val DATASOURCE_PREFIX = "spark.sql.sources."
-  val DATASOURCE_PROVIDER = DATASOURCE_PREFIX + "provider"
-  val DATASOURCE_WRITEJOBUUID = DATASOURCE_PREFIX + "writeJobUUID"
-  val DATASOURCE_OUTPUTPATH = DATASOURCE_PREFIX + "output.path"
-  val DATASOURCE_SCHEMA = DATASOURCE_PREFIX + "schema"
-  val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_SCHEMA + "."
-  val DATASOURCE_SCHEMA_NUMPARTS = DATASOURCE_SCHEMA_PREFIX + "numParts"
-  val DATASOURCE_SCHEMA_NUMPARTCOLS = DATASOURCE_SCHEMA_PREFIX + "numPartCols"
-  val DATASOURCE_SCHEMA_NUMSORTCOLS = DATASOURCE_SCHEMA_PREFIX + "numSortCols"
-  val DATASOURCE_SCHEMA_NUMBUCKETS = DATASOURCE_SCHEMA_PREFIX + "numBuckets"
-  val DATASOURCE_SCHEMA_NUMBUCKETCOLS = DATASOURCE_SCHEMA_PREFIX + "numBucketCols"
-  val DATASOURCE_SCHEMA_PART_PREFIX = DATASOURCE_SCHEMA_PREFIX + "part."
-  val DATASOURCE_SCHEMA_PARTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "partCol."
-  val DATASOURCE_SCHEMA_BUCKETCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "bucketCol."
-  val DATASOURCE_SCHEMA_SORTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "sortCol."
-
-  def createDataSourceTable(
-      sparkSession: SparkSession,
-      tableIdent: TableIdentifier,
-      schema: StructType,
-      partitionColumns: Array[String],
-      bucketSpec: Option[BucketSpec],
-      provider: String,
-      options: Map[String, String],
-      isExternal: Boolean): Unit = {
-    val tableProperties = new mutable.HashMap[String, String]
-    tableProperties.put(DATASOURCE_PROVIDER, provider)
-
-    // Serialized JSON schema string may be too long to be stored into a single metastore table
-    // property. In this case, we split the JSON string and store each part as a separate table
-    // property.
-    val threshold = sparkSession.sessionState.conf.schemaStringLengthThreshold
-    val schemaJsonString = schema.json
-    // Split the JSON string.
-    val parts = schemaJsonString.grouped(threshold).toSeq
-    tableProperties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
-    parts.zipWithIndex.foreach { case (part, index) =>
-      tableProperties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part)
-    }
-
-    if (partitionColumns.length > 0) {
-      tableProperties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString)
-      partitionColumns.zipWithIndex.foreach { case (partCol, index) =>
-        tableProperties.put(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index", partCol)
-      }
-    }
-
-    if (bucketSpec.isDefined) {
-      val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get
-
-      tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETS, numBuckets.toString)
-      tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETCOLS, bucketColumnNames.length.toString)
-      bucketColumnNames.zipWithIndex.foreach { case (bucketCol, index) =>
-        tableProperties.put(s"$DATASOURCE_SCHEMA_BUCKETCOL_PREFIX$index", bucketCol)
-      }
-
-      if (sortColumnNames.nonEmpty) {
-        tableProperties.put(DATASOURCE_SCHEMA_NUMSORTCOLS, sortColumnNames.length.toString)
-        sortColumnNames.zipWithIndex.foreach { case (sortCol, index) =>
-          tableProperties.put(s"$DATASOURCE_SCHEMA_SORTCOL_PREFIX$index", sortCol)
-        }
-      }
-    }
-
-    val tableType = if (isExternal) {
-      tableProperties.put("EXTERNAL", "TRUE")
-      CatalogTableType.EXTERNAL
-    } else {
-      tableProperties.put("EXTERNAL", "FALSE")
-      CatalogTableType.MANAGED
-    }
-
-    val maybeSerDe = HiveSerDe.sourceToSerDe(provider, sparkSession.sessionState.conf)
-    val dataSource =
-      DataSource(
-        sparkSession,
-        userSpecifiedSchema = Some(schema),
-        partitionColumns = partitionColumns,
-        bucketSpec = bucketSpec,
-        className = provider,
-        options = options)
-
-    def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
-      CatalogTable(
-        identifier = tableIdent,
-        tableType = tableType,
-        schema = new StructType,
-        provider = Some(provider),
-        storage = CatalogStorageFormat(
-          locationUri = None,
-          inputFormat = None,
-          outputFormat = None,
-          serde = None,
-          compressed = false,
-          properties = options
-        ),
-        properties = tableProperties.toMap)
-    }
-
-    def newHiveCompatibleMetastoreTable(
-        relation: HadoopFsRelation,
-        serde: HiveSerDe): CatalogTable = {
-      assert(partitionColumns.isEmpty)
-      assert(relation.partitionSchema.isEmpty)
-
-      CatalogTable(
-        identifier = tableIdent,
-        tableType = tableType,
-        storage = CatalogStorageFormat(
-          locationUri = Some(relation.location.paths.map(_.toUri.toString).head),
-          inputFormat = serde.inputFormat,
-          outputFormat = serde.outputFormat,
-          serde = serde.serde,
-          compressed = false,
-          properties = options
-        ),
-        schema = relation.schema,
-        provider = Some(provider),
-        properties = tableProperties.toMap,
-        viewText = None)
-    }
-
-    // TODO: Support persisting partitioned data source relations in Hive compatible format
-    val qualifiedTableName = tableIdent.quotedString
-    val skipHiveMetadata = options.getOrElse("skipHiveMetadata", "false").toBoolean
-    val resolvedRelation = dataSource.resolveRelation(checkPathExist = false)
-    val (hiveCompatibleTable, logMessage) = (maybeSerDe, resolvedRelation) match {
-      case _ if skipHiveMetadata =>
-        val message =
-          s"Persisting partitioned data source relation $qualifiedTableName into " +
-            "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive."
-        (None, message)
-
-      case (Some(serde), relation: HadoopFsRelation) if relation.location.paths.length == 1 &&
-        relation.partitionSchema.isEmpty && relation.bucketSpec.isEmpty =>
-        val hiveTable = newHiveCompatibleMetastoreTable(relation, serde)
-        val message =
-          s"Persisting data source relation $qualifiedTableName with a single input path " +
-            s"into Hive metastore in Hive compatible format. Input path: " +
-            s"${relation.location.paths.head}."
-        (Some(hiveTable), message)
-
-      case (Some(serde), relation: HadoopFsRelation) if relation.partitionSchema.nonEmpty =>
-        val message =
-          s"Persisting partitioned data source relation $qualifiedTableName into " +
-            "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " +
-            "Input path(s): " + relation.location.paths.mkString("\n", "\n", "")
-        (None, message)
-
-      case (Some(serde), relation: HadoopFsRelation) if relation.bucketSpec.nonEmpty =>
-        val message =
-          s"Persisting bucketed data source relation $qualifiedTableName into " +
-            "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " +
-            "Input path(s): " + relation.location.paths.mkString("\n", "\n", "")
-        (None, message)
-
-      case (Some(serde), relation: HadoopFsRelation) =>
-        val message =
-          s"Persisting data source relation $qualifiedTableName with multiple input paths into " +
-            "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " +
-            s"Input paths: " + relation.location.paths.mkString("\n", "\n", "")
-        (None, message)
-
-      case (Some(serde), _) =>
-        val message =
-          s"Data source relation $qualifiedTableName is not a " +
-            s"${classOf[HadoopFsRelation].getSimpleName}. Persisting it into Hive metastore " +
-            "in Spark SQL specific format, which is NOT compatible with Hive."
-        (None, message)
-
-      case _ =>
-        val message =
-          s"Couldn't find corresponding Hive SerDe for data source provider $provider. " +
-            s"Persisting data source relation $qualifiedTableName into Hive metastore in " +
-            s"Spark SQL specific format, which is NOT compatible with Hive."
-        (None, message)
-    }
-
-    (hiveCompatibleTable, logMessage) match {
-      case (Some(table), message) =>
-        // We first try to save the metadata of the table in a Hive compatible way.
-        // If Hive throws an error, we fall back to save its metadata in the Spark SQL
-        // specific way.
-        try {
-          logInfo(message)
-          sparkSession.sessionState.catalog.createTable(table, ignoreIfExists = false)
-        } catch {
-          case NonFatal(e) =>
-            val warningMessage =
-              s"Could not persist $qualifiedTableName in a Hive compatible way. Persisting " +
-                s"it into Hive metastore in Spark SQL specific format."
-            logWarning(warningMessage, e)
-            val table = newSparkSQLSpecificMetastoreTable()
-            sparkSession.sessionState.catalog.createTable(table, ignoreIfExists = false)
-        }
-
-      case (None, message) =>
-        logWarning(message)
-        val table = newSparkSQLSpecificMetastoreTable()
-        sparkSession.sessionState.catalog.createTable(table, ignoreIfExists = false)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/b2074b66/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 2eff933..3817f91 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -27,10 +27,9 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogTable, CatalogTablePartition, CatalogTableType, SessionCatalog}
-import org.apache.spark.sql.catalyst.catalog.CatalogTypes._
+import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTablePartition, CatalogTableType, SessionCatalog}
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
-import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
 import org.apache.spark.sql.execution.datasources.PartitioningUtils
 import org.apache.spark.sql.types._
 
@@ -234,10 +233,8 @@ case class AlterTableSetPropertiesCommand(
   extends RunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    val ident = if (isView) "VIEW" else "TABLE"
     val catalog = sparkSession.sessionState.catalog
     DDLUtils.verifyAlterTableType(catalog, tableName, isView)
-    DDLUtils.verifyTableProperties(properties.keys.toSeq, s"ALTER $ident")
     val table = catalog.getTableMetadata(tableName)
     // This overrides old properties
     val newTable = table.copy(properties = table.properties ++ properties)
@@ -264,10 +261,8 @@ case class AlterTableUnsetPropertiesCommand(
   extends RunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    val ident = if (isView) "VIEW" else "TABLE"
     val catalog = sparkSession.sessionState.catalog
     DDLUtils.verifyAlterTableType(catalog, tableName, isView)
-    DDLUtils.verifyTableProperties(propKeys, s"ALTER $ident")
     val table = catalog.getTableMetadata(tableName)
     if (!ifExists) {
       propKeys.foreach { k =>
@@ -445,11 +440,11 @@ case class AlterTableRecoverPartitionsCommand(
     if (!catalog.tableExists(tableName)) {
       throw new AnalysisException(s"Table $tableName in $cmd does not exist.")
     }
-    val table = catalog.getTableMetadata(tableName)
     if (catalog.isTemporaryTable(tableName)) {
       throw new AnalysisException(
         s"Operation not allowed: $cmd on temporary tables: $tableName")
     }
+    val table = catalog.getTableMetadata(tableName)
     if (DDLUtils.isDatasourceTable(table)) {
       throw new AnalysisException(
         s"Operation not allowed: $cmd on datasource tables: $tableName")
@@ -458,7 +453,7 @@ case class AlterTableRecoverPartitionsCommand(
       throw new AnalysisException(
         s"Operation not allowed: $cmd only works on external tables: $tableName")
     }
-    if (!DDLUtils.isTablePartitioned(table)) {
+    if (table.partitionColumnNames.isEmpty) {
       throw new AnalysisException(
         s"Operation not allowed: $cmd only works on partitioned tables: $tableName")
     }
@@ -584,13 +579,8 @@ case class AlterTableSetLocationCommand(
 
 
 object DDLUtils {
-
-  def isDatasourceTable(props: Map[String, String]): Boolean = {
-    props.contains(DATASOURCE_PROVIDER)
-  }
-
   def isDatasourceTable(table: CatalogTable): Boolean = {
-    isDatasourceTable(table.properties)
+    table.provider.isDefined && table.provider.get != "hive"
   }
 
   /**
@@ -611,78 +601,4 @@ object DDLUtils {
       case _ =>
     })
   }
-
-  /**
-   * If the given table properties (or SerDe properties) contains datasource properties,
-   * throw an exception.
-   */
-  def verifyTableProperties(propKeys: Seq[String], operation: String): Unit = {
-    val datasourceKeys = propKeys.filter(_.startsWith(DATASOURCE_PREFIX))
-    if (datasourceKeys.nonEmpty) {
-      throw new AnalysisException(s"Operation not allowed: $operation property keys may not " +
-        s"start with '$DATASOURCE_PREFIX': ${datasourceKeys.mkString("[", ", ", "]")}")
-    }
-  }
-
-  def isTablePartitioned(table: CatalogTable): Boolean = {
-    table.partitionColumnNames.nonEmpty || table.properties.contains(DATASOURCE_SCHEMA_NUMPARTCOLS)
-  }
-
-  // A persisted data source table always store its schema in the catalog.
-  def getSchemaFromTableProperties(metadata: CatalogTable): StructType = {
-    require(isDatasourceTable(metadata))
-    val msgSchemaCorrupted = "Could not read schema from the metastore because it is corrupted."
-    val props = metadata.properties
-    props.get(DATASOURCE_SCHEMA).map { schema =>
-      // Originally, we used spark.sql.sources.schema to store the schema of a data source table.
-      // After SPARK-6024, we removed this flag.
-      // Although we are not using spark.sql.sources.schema any more, we need to still support.
-      DataType.fromJson(schema).asInstanceOf[StructType]
-    } getOrElse {
-      props.get(DATASOURCE_SCHEMA_NUMPARTS).map { numParts =>
-        val parts = (0 until numParts.toInt).map { index =>
-          val part = metadata.properties.get(s"$DATASOURCE_SCHEMA_PART_PREFIX$index").orNull
-          if (part == null) {
-            throw new AnalysisException(msgSchemaCorrupted +
-              s" (missing part $index of the schema, $numParts parts are expected).")
-          }
-          part
-        }
-        // Stick all parts back to a single schema string.
-        DataType.fromJson(parts.mkString).asInstanceOf[StructType]
-      } getOrElse(throw new AnalysisException(msgSchemaCorrupted))
-    }
-  }
-
-  private def getColumnNamesByType(
-      props: Map[String, String], colType: String, typeName: String): Seq[String] = {
-    require(isDatasourceTable(props))
-
-    for {
-      numCols <- props.get(s"spark.sql.sources.schema.num${colType.capitalize}Cols").toSeq
-      index <- 0 until numCols.toInt
-    } yield props.getOrElse(
-      s"$DATASOURCE_SCHEMA_PREFIX${colType}Col.$index",
-      throw new AnalysisException(
-        s"Corrupted $typeName in catalog: $numCols parts expected, but part $index is missing."
-      )
-    )
-  }
-
-  def getPartitionColumnsFromTableProperties(metadata: CatalogTable): Seq[String] = {
-    getColumnNamesByType(metadata.properties, "part", "partitioning columns")
-  }
-
-  def getBucketSpecFromTableProperties(metadata: CatalogTable): Option[BucketSpec] = {
-    if (isDatasourceTable(metadata)) {
-      metadata.properties.get(DATASOURCE_SCHEMA_NUMBUCKETS).map { numBuckets =>
-        BucketSpec(
-          numBuckets.toInt,
-          getColumnNamesByType(metadata.properties, "bucket", "bucketing columns"),
-          getColumnNamesByType(metadata.properties, "sort", "sorting columns"))
-      }
-    } else {
-      None
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b2074b66/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 720399e..af2b5ff 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -119,11 +119,9 @@ case class CreateTableLikeCommand(
 case class CreateTableCommand(table: CatalogTable, ifNotExists: Boolean) extends RunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    DDLUtils.verifyTableProperties(table.properties.keys.toSeq, "CREATE TABLE")
     sparkSession.sessionState.catalog.createTable(table, ifNotExists)
     Seq.empty[Row]
   }
-
 }
 
 
@@ -414,8 +412,8 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
       describeSchema(catalog.lookupRelation(table).schema, result)
     } else {
       val metadata = catalog.getTableMetadata(table)
+      describeSchema(metadata.schema, result)
 
-      describeSchema(metadata, result)
       if (isExtended) {
         describeExtended(metadata, result)
       } else if (isFormatted) {
@@ -429,20 +427,10 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
   }
 
   private def describePartitionInfo(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
-    if (DDLUtils.isDatasourceTable(table)) {
-      val partColNames = DDLUtils.getPartitionColumnsFromTableProperties(table)
-      if (partColNames.nonEmpty) {
-        val userSpecifiedSchema = DDLUtils.getSchemaFromTableProperties(table)
-        append(buffer, "# Partition Information", "", "")
-        append(buffer, s"# ${output.head.name}", output(1).name, output(2).name)
-        describeSchema(StructType(partColNames.map(userSpecifiedSchema(_))), buffer)
-      }
-    } else {
-      if (table.partitionColumnNames.nonEmpty) {
-        append(buffer, "# Partition Information", "", "")
-        append(buffer, s"# ${output.head.name}", output(1).name, output(2).name)
-        describeSchema(table.partitionSchema, buffer)
-      }
+    if (table.partitionColumnNames.nonEmpty) {
+      append(buffer, "# Partition Information", "", "")
+      append(buffer, s"# ${output.head.name}", output(1).name, output(2).name)
+      describeSchema(table.partitionSchema, buffer)
     }
   }
 
@@ -466,11 +454,7 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
     append(buffer, "Table Type:", table.tableType.name, "")
 
     append(buffer, "Table Parameters:", "", "")
-    table.properties.filterNot {
-      // Hides schema properties that hold user-defined schema, partition columns, and bucketing
-      // information since they are already extracted and shown in other parts.
-      case (key, _) => key.startsWith(CreateDataSourceTableUtils.DATASOURCE_SCHEMA)
-    }.foreach { case (key, value) =>
+    table.properties.foreach { case (key, value) =>
       append(buffer, s"  $key", value, "")
     }
 
@@ -493,7 +477,7 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
   }
 
   private def describeBucketingInfo(metadata: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
-    def appendBucketInfo(bucketSpec: Option[BucketSpec]) = bucketSpec match {
+    metadata.bucketSpec match {
       case Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames)) =>
         append(buffer, "Num Buckets:", numBuckets.toString, "")
         append(buffer, "Bucket Columns:", bucketColumnNames.mkString("[", ", ", "]"), "")
@@ -501,23 +485,6 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
 
       case _ =>
     }
-
-    if (DDLUtils.isDatasourceTable(metadata)) {
-      appendBucketInfo(DDLUtils.getBucketSpecFromTableProperties(metadata))
-    } else {
-      appendBucketInfo(metadata.bucketSpec)
-    }
-  }
-
-  private def describeSchema(
-      tableDesc: CatalogTable,
-      buffer: ArrayBuffer[Row]): Unit = {
-    if (DDLUtils.isDatasourceTable(tableDesc)) {
-      val schema = DDLUtils.getSchemaFromTableProperties(tableDesc)
-      describeSchema(schema, buffer)
-    } else {
-      describeSchema(tableDesc.schema, buffer)
-    }
   }
 
   private def describeSchema(schema: StructType, buffer: ArrayBuffer[Row]): Unit = {
@@ -678,7 +645,7 @@ case class ShowPartitionsCommand(
         s"SHOW PARTITIONS is not allowed on a view or index table: ${tab.qualifiedName}")
     }
 
-    if (!DDLUtils.isTablePartitioned(tab)) {
+    if (tab.partitionColumnNames.isEmpty) {
       throw new AnalysisException(
         s"SHOW PARTITIONS is not allowed on a table that is not partitioned: ${tab.qualifiedName}")
     }
@@ -729,6 +696,7 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
 
     val tableMetadata = catalog.getTableMetadata(table)
 
+    // TODO: unify this after we unify the CREATE TABLE syntax for hive serde and data source table.
     val stmt = if (DDLUtils.isDatasourceTable(tableMetadata)) {
       showCreateDataSourceTable(tableMetadata)
     } else {
@@ -872,15 +840,14 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
 
   private def showDataSourceTableDataColumns(
       metadata: CatalogTable, builder: StringBuilder): Unit = {
-    val schema = DDLUtils.getSchemaFromTableProperties(metadata)
-    val columns = schema.fields.map(f => s"${quoteIdentifier(f.name)} ${f.dataType.sql}")
+    val columns = metadata.schema.fields.map(f => s"${quoteIdentifier(f.name)} ${f.dataType.sql}")
     builder ++= columns.mkString("(", ", ", ")\n")
   }
 
   private def showDataSourceTableOptions(metadata: CatalogTable, builder: StringBuilder): Unit = {
     val props = metadata.properties
 
-    builder ++= s"USING ${props(CreateDataSourceTableUtils.DATASOURCE_PROVIDER)}\n"
+    builder ++= s"USING ${metadata.provider.get}\n"
 
     val dataSourceOptions = metadata.storage.properties.filterNot {
       case (key, value) =>
@@ -900,12 +867,12 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
 
   private def showDataSourceTableNonDataColumns(
       metadata: CatalogTable, builder: StringBuilder): Unit = {
-    val partCols = DDLUtils.getPartitionColumnsFromTableProperties(metadata)
+    val partCols = metadata.partitionColumnNames
     if (partCols.nonEmpty) {
       builder ++= s"PARTITIONED BY ${partCols.mkString("(", ", ", ")")}\n"
     }
 
-    DDLUtils.getBucketSpecFromTableProperties(metadata).foreach { spec =>
+    metadata.bucketSpec.foreach { spec =>
       if (spec.bucketColumnNames.nonEmpty) {
         builder ++= s"CLUSTERED BY ${spec.bucketColumnNames.mkString("(", ", ", ")")}\n"
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b2074b66/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 733ba18..5eba7df 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
 import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPartitioning}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
-import org.apache.spark.sql.execution.command.{CreateDataSourceTableUtils, DDLUtils, ExecutedCommandExec}
+import org.apache.spark.sql.execution.command.{DDLUtils, ExecutedCommandExec}
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
@@ -204,24 +204,14 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
  */
 class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] {
   private def readDataSourceTable(sparkSession: SparkSession, table: CatalogTable): LogicalPlan = {
-    val schema = DDLUtils.getSchemaFromTableProperties(table)
-
-    // We only need names at here since userSpecifiedSchema we loaded from the metastore
-    // contains partition columns. We can always get datatypes of partitioning columns
-    // from userSpecifiedSchema.
-    val partitionColumns = DDLUtils.getPartitionColumnsFromTableProperties(table)
-
-    val bucketSpec = DDLUtils.getBucketSpecFromTableProperties(table)
-
-    val options = table.storage.properties
     val dataSource =
       DataSource(
         sparkSession,
-        userSpecifiedSchema = Some(schema),
-        partitionColumns = partitionColumns,
-        bucketSpec = bucketSpec,
-        className = table.properties(CreateDataSourceTableUtils.DATASOURCE_PROVIDER),
-        options = options)
+        userSpecifiedSchema = Some(table.schema),
+        partitionColumns = table.partitionColumnNames,
+        bucketSpec = table.bucketSpec,
+        className = table.provider.get,
+        options = table.storage.properties)
 
     LogicalRelation(
       dataSource.resolveRelation(),

http://git-wip-us.apache.org/repos/asf/spark/blob/b2074b66/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index 447c237..7880c7c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -33,7 +33,6 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.UnsafeKVExternalSorter
-import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
 import org.apache.spark.util.{SerializableConfiguration, Utils}
@@ -48,6 +47,11 @@ private[datasources] case class WriteRelation(
     prepareJobForWrite: Job => OutputWriterFactory,
     bucketSpec: Option[BucketSpec])
 
+object WriterContainer {
+  val DATASOURCE_WRITEJOBUUID = "spark.sql.sources.writeJobUUID"
+  val DATASOURCE_OUTPUTPATH = "spark.sql.sources.output.path"
+}
+
 private[datasources] abstract class BaseWriterContainer(
     @transient val relation: WriteRelation,
     @transient private val job: Job,
@@ -94,7 +98,7 @@ private[datasources] abstract class BaseWriterContainer(
     // This UUID is sent to executor side together with the serialized `Configuration` object within
     // the `Job` instance.  `OutputWriters` on the executor side should use this UUID to generate
     // unique task output files.
-    job.getConfiguration.set(DATASOURCE_WRITEJOBUUID, uniqueWriteJobId.toString)
+    job.getConfiguration.set(WriterContainer.DATASOURCE_WRITEJOBUUID, uniqueWriteJobId.toString)
 
     // Order of the following two lines is important.  For Hadoop 1, TaskAttemptContext constructor
     // clones the Configuration object passed in.  If we initialize the TaskAttemptContext first,
@@ -244,7 +248,7 @@ private[datasources] class DefaultWriterContainer(
   def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = {
     executorSideSetup(taskContext)
     val configuration = taskAttemptContext.getConfiguration
-    configuration.set(DATASOURCE_OUTPUTPATH, outputPath)
+    configuration.set(WriterContainer.DATASOURCE_OUTPUTPATH, outputPath)
     var writer = newOutputWriter(getWorkPath)
     writer.initConverter(dataSchema)
 
@@ -352,10 +356,12 @@ private[datasources] class DynamicPartitionWriterContainer(
     val configuration = taskAttemptContext.getConfiguration
     val path = if (partitionColumns.nonEmpty) {
       val partitionPath = getPartitionString(key).getString(0)
-      configuration.set(DATASOURCE_OUTPUTPATH, new Path(outputPath, partitionPath).toString)
+      configuration.set(
+        WriterContainer.DATASOURCE_OUTPUTPATH,
+        new Path(outputPath, partitionPath).toString)
       new Path(getWorkPath, partitionPath).toString
     } else {
-      configuration.set(DATASOURCE_OUTPUTPATH, outputPath)
+      configuration.set(WriterContainer.DATASOURCE_OUTPUTPATH, outputPath)
       getWorkPath
     }
     val bucketId = getBucketIdFromKey(key)

http://git-wip-us.apache.org/repos/asf/spark/blob/b2074b66/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
index 6b2f9fc..de2d633 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
@@ -30,8 +30,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
-import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
-import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory, PartitionedFile}
+import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory, PartitionedFile, WriterContainer}
 import org.apache.spark.sql.types._
 
 object CSVRelation extends Logging {
@@ -192,7 +191,7 @@ private[csv] class CsvOutputWriter(
     new TextOutputFormat[NullWritable, Text]() {
       override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
         val configuration = context.getConfiguration
-        val uniqueWriteJobId = configuration.get(CreateDataSourceTableUtils.DATASOURCE_WRITEJOBUUID)
+        val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
         val taskAttemptId = context.getTaskAttemptID
         val split = taskAttemptId.getTaskID.getId
         new Path(path, f"part-r-$split%05d-$uniqueWriteJobId.csv$extension")

http://git-wip-us.apache.org/repos/asf/spark/blob/b2074b66/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 27910e2..16150b9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -31,7 +31,6 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.StructType
@@ -164,7 +163,7 @@ private[json] class JsonOutputWriter(
     new TextOutputFormat[NullWritable, Text]() {
       override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
         val configuration = context.getConfiguration
-        val uniqueWriteJobId = configuration.get(CreateDataSourceTableUtils.DATASOURCE_WRITEJOBUUID)
+        val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
         val taskAttemptId = context.getTaskAttemptID
         val split = taskAttemptId.getTaskID.getId
         val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")

http://git-wip-us.apache.org/repos/asf/spark/blob/b2074b66/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 9c4778a..9208c82 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -44,7 +44,6 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
-import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.internal.SQLConf
@@ -547,8 +546,7 @@ private[parquet] class ParquetOutputWriter(
         //     partitions in the case of dynamic partitioning.
         override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
           val configuration = context.getConfiguration
-          val uniqueWriteJobId = configuration.get(
-            CreateDataSourceTableUtils.DATASOURCE_WRITEJOBUUID)
+          val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
           val taskAttemptId = context.getTaskAttemptID
           val split = taskAttemptId.getTaskID.getId
           val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")

http://git-wip-us.apache.org/repos/asf/spark/blob/b2074b66/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
index abb6059..a0c3fd5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
@@ -27,7 +27,6 @@ import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter}
-import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{StringType, StructType}
@@ -131,7 +130,7 @@ class TextOutputWriter(path: String, dataSchema: StructType, context: TaskAttemp
     new TextOutputFormat[NullWritable, Text]() {
       override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
         val configuration = context.getConfiguration
-        val uniqueWriteJobId = configuration.get(CreateDataSourceTableUtils.DATASOURCE_WRITEJOBUUID)
+        val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
         val taskAttemptId = context.getTaskAttemptID
         val split = taskAttemptId.getTaskID.getId
         new Path(path, f"part-r-$split%05d-$uniqueWriteJobId.txt$extension")

http://git-wip-us.apache.org/repos/asf/spark/blob/b2074b66/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala
index ad69137..52e648a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala
@@ -28,10 +28,9 @@ object HiveSerDe {
    *
    * @param source Currently the source abbreviation can be one of the following:
    *               SequenceFile, RCFile, ORC, PARQUET, and case insensitive.
-   * @param conf SQLConf
    * @return HiveSerDe associated with the specified source
    */
-  def sourceToSerDe(source: String, conf: SQLConf): Option[HiveSerDe] = {
+  def sourceToSerDe(source: String): Option[HiveSerDe] = {
     val serdeMap = Map(
       "sequencefile" ->
         HiveSerDe(
@@ -42,8 +41,7 @@ object HiveSerDe {
         HiveSerDe(
           inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"),
           outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"),
-          serde = Option(conf.getConfString("hive.default.rcfile.serde",
-            "org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"))),
+          serde = Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe")),
 
       "orc" ->
         HiveSerDe(

http://git-wip-us.apache.org/repos/asf/spark/blob/b2074b66/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index be1bccb..8dd883b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -243,7 +243,7 @@ class DDLCommandSuite extends PlanTest {
     allSources.foreach { s =>
       val query = s"CREATE TABLE my_tab STORED AS $s"
       val ct = parseAs[CreateTable](query)
-      val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf)
+      val hiveSerde = HiveSerDe.sourceToSerDe(s)
       assert(hiveSerde.isDefined)
       assert(ct.tableDesc.storage.serde == hiveSerde.get.serde)
       assert(ct.tableDesc.storage.inputFormat == hiveSerde.get.inputFormat)
@@ -276,7 +276,7 @@ class DDLCommandSuite extends PlanTest {
       val query = s"CREATE TABLE my_tab ROW FORMAT SERDE 'anything' STORED AS $s"
       if (supportedSources.contains(s)) {
         val ct = parseAs[CreateTable](query)
-        val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf)
+        val hiveSerde = HiveSerDe.sourceToSerDe(s)
         assert(hiveSerde.isDefined)
         assert(ct.tableDesc.storage.serde == Some("anything"))
         assert(ct.tableDesc.storage.inputFormat == hiveSerde.get.inputFormat)
@@ -295,7 +295,7 @@ class DDLCommandSuite extends PlanTest {
       val query = s"CREATE TABLE my_tab ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' STORED AS $s"
       if (supportedSources.contains(s)) {
         val ct = parseAs[CreateTable](query)
-        val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf)
+        val hiveSerde = HiveSerDe.sourceToSerDe(s)
         assert(hiveSerde.isDefined)
         assert(ct.tableDesc.storage.serde == hiveSerde.get.serde)
         assert(ct.tableDesc.storage.inputFormat == hiveSerde.get.inputFormat)

http://git-wip-us.apache.org/repos/asf/spark/blob/b2074b66/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 0f7fda7..e6ae422 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, Catal
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
@@ -93,7 +92,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
         .add("col2", "string")
         .add("a", "int")
         .add("b", "int"),
-      provider = Some("parquet"),
+      provider = Some("hive"),
       partitionColumnNames = Seq("a", "b"),
       createTime = 0L)
   }
@@ -277,10 +276,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
          """.stripMargin)
       val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName))
 
-      assert(expectedSchema ==
-        DDLUtils.getSchemaFromTableProperties(tableMetadata))
-      assert(expectedPartitionCols ==
-        DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata))
+      assert(expectedSchema == tableMetadata.schema)
+      assert(expectedPartitionCols == tableMetadata.partitionColumnNames)
     }
   }
 
@@ -399,41 +396,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     assert(e.message == "Found duplicate column(s) in bucket: a")
   }
 
-  test("Describe Table with Corrupted Schema") {
-    import testImplicits._
-
-    val tabName = "tab1"
-    withTempPath { dir =>
-      val path = dir.getCanonicalPath
-      val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("col1", "col2")
-      df.write.format("json").save(path)
-      val uri = dir.toURI
-
-      withTable(tabName) {
-        sql(
-          s"""
-             |CREATE TABLE $tabName
-             |USING json
-             |OPTIONS (
-             |  path '$uri'
-             |)
-           """.stripMargin)
-
-        val catalog = spark.sessionState.catalog
-        val table = catalog.getTableMetadata(TableIdentifier(tabName))
-        val newProperties = table.properties.filterKeys(key =>
-          key != CreateDataSourceTableUtils.DATASOURCE_SCHEMA_NUMPARTS)
-        val newTable = table.copy(properties = newProperties)
-        catalog.alterTable(newTable)
-
-        val e = intercept[AnalysisException] {
-          sql(s"DESC $tabName")
-        }.getMessage
-        assert(e.contains(s"Could not read schema from the metastore because it is corrupted"))
-      }
-    }
-  }
-
   test("Refresh table after changing the data source table partitioning") {
     import testImplicits._
 
@@ -460,10 +422,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
              |)
            """.stripMargin)
         val tableMetadata = catalog.getTableMetadata(TableIdentifier(tabName))
-        val tableSchema = DDLUtils.getSchemaFromTableProperties(tableMetadata)
-        assert(tableSchema == schema)
-        val partCols = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata)
-        assert(partCols == partitionCols)
+        assert(tableMetadata.schema == schema)
+        assert(tableMetadata.partitionColumnNames == partitionCols)
 
         // Change the schema
         val newDF = sparkContext.parallelize(1 to 10).map(i => (i, i.toString))
@@ -472,23 +432,15 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
 
         // No change on the schema
         val tableMetadataBeforeRefresh = catalog.getTableMetadata(TableIdentifier(tabName))
-        val tableSchemaBeforeRefresh =
-          DDLUtils.getSchemaFromTableProperties(tableMetadataBeforeRefresh)
-        assert(tableSchemaBeforeRefresh == schema)
-        val partColsBeforeRefresh =
-          DDLUtils.getPartitionColumnsFromTableProperties(tableMetadataBeforeRefresh)
-        assert(partColsBeforeRefresh == partitionCols)
+        assert(tableMetadataBeforeRefresh.schema == schema)
+        assert(tableMetadataBeforeRefresh.partitionColumnNames == partitionCols)
 
         // Refresh does not affect the schema
         spark.catalog.refreshTable(tabName)
 
         val tableMetadataAfterRefresh = catalog.getTableMetadata(TableIdentifier(tabName))
-        val tableSchemaAfterRefresh =
-          DDLUtils.getSchemaFromTableProperties(tableMetadataAfterRefresh)
-        assert(tableSchemaAfterRefresh == schema)
-        val partColsAfterRefresh =
-          DDLUtils.getPartitionColumnsFromTableProperties(tableMetadataAfterRefresh)
-        assert(partColsAfterRefresh == partitionCols)
+        assert(tableMetadataAfterRefresh.schema == schema)
+        assert(tableMetadataAfterRefresh.partitionColumnNames == partitionCols)
       }
     }
   }
@@ -641,7 +593,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       val table = catalog.getTableMetadata(TableIdentifier("tbl"))
       assert(table.tableType == CatalogTableType.MANAGED)
       assert(table.schema == new StructType().add("a", "int").add("b", "int"))
-      assert(table.properties(DATASOURCE_PROVIDER) == "parquet")
+      assert(table.provider == Some("parquet"))
     }
   }
 
@@ -651,12 +603,9 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       sql("CREATE TABLE tbl(a INT, b INT) USING parquet PARTITIONED BY (a)")
       val table = catalog.getTableMetadata(TableIdentifier("tbl"))
       assert(table.tableType == CatalogTableType.MANAGED)
-      assert(table.schema.isEmpty) // partitioned datasource table is not hive-compatible
-      assert(table.properties(DATASOURCE_PROVIDER) == "parquet")
-      assert(DDLUtils.getSchemaFromTableProperties(table) ==
-        new StructType().add("a", IntegerType).add("b", IntegerType))
-      assert(DDLUtils.getPartitionColumnsFromTableProperties(table) ==
-        Seq("a"))
+      assert(table.provider == Some("parquet"))
+      assert(table.schema == new StructType().add("a", IntegerType).add("b", IntegerType))
+      assert(table.partitionColumnNames == Seq("a"))
     }
   }
 
@@ -667,12 +616,9 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
         "CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS")
       val table = catalog.getTableMetadata(TableIdentifier("tbl"))
       assert(table.tableType == CatalogTableType.MANAGED)
-      assert(table.schema.isEmpty) // partitioned datasource table is not hive-compatible
-      assert(table.properties(DATASOURCE_PROVIDER) == "parquet")
-      assert(DDLUtils.getSchemaFromTableProperties(table) ==
-        new StructType().add("a", IntegerType).add("b", IntegerType))
-      assert(DDLUtils.getBucketSpecFromTableProperties(table) ==
-        Some(BucketSpec(5, Seq("a"), Seq("b"))))
+      assert(table.provider == Some("parquet"))
+      assert(table.schema == new StructType().add("a", IntegerType).add("b", IntegerType))
+      assert(table.bucketSpec == Some(BucketSpec(5, Seq("a"), Seq("b"))))
     }
   }
 
@@ -1096,7 +1042,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       catalog: SessionCatalog,
       tableIdent: TableIdentifier): Unit = {
     catalog.alterTable(catalog.getTableMetadata(tableIdent).copy(
-      properties = Map(DATASOURCE_PROVIDER -> "csv")))
+      provider = Some("csv")))
   }
 
   private def testSetProperties(isDatasourceTable: Boolean): Unit = {
@@ -1108,9 +1054,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       convertToDatasourceTable(catalog, tableIdent)
     }
     def getProps: Map[String, String] = {
-      catalog.getTableMetadata(tableIdent).properties.filterKeys { k =>
-        !isDatasourceTable || !k.startsWith(DATASOURCE_PREFIX)
-      }
+      catalog.getTableMetadata(tableIdent).properties
     }
     assert(getProps.isEmpty)
     // set table properties
@@ -1124,11 +1068,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     intercept[AnalysisException] {
       sql("ALTER TABLE does_not_exist SET TBLPROPERTIES ('winner' = 'loser')")
     }
-    // datasource table property keys are not allowed
-    val e = intercept[AnalysisException] {
-      sql(s"ALTER TABLE tab1 SET TBLPROPERTIES ('${DATASOURCE_PREFIX}foo' = 'loser')")
-    }
-    assert(e.getMessage.contains(DATASOURCE_PREFIX + "foo"))
   }
 
   private def testUnsetProperties(isDatasourceTable: Boolean): Unit = {
@@ -1140,9 +1079,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       convertToDatasourceTable(catalog, tableIdent)
     }
     def getProps: Map[String, String] = {
-      catalog.getTableMetadata(tableIdent).properties.filterKeys { k =>
-        !isDatasourceTable || !k.startsWith(DATASOURCE_PREFIX)
-      }
+      catalog.getTableMetadata(tableIdent).properties
     }
     // unset table properties
     sql("ALTER TABLE dbx.tab1 SET TBLPROPERTIES ('j' = 'am', 'p' = 'an', 'c' = 'lan', 'x' = 'y')")
@@ -1164,11 +1101,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     // property to unset does not exist, but "IF EXISTS" is specified
     sql("ALTER TABLE tab1 UNSET TBLPROPERTIES IF EXISTS ('c', 'xyz')")
     assert(getProps == Map("x" -> "y"))
-    // datasource table property keys are not allowed
-    val e2 = intercept[AnalysisException] {
-      sql(s"ALTER TABLE tab1 UNSET TBLPROPERTIES ('${DATASOURCE_PREFIX}foo')")
-    }
-    assert(e2.getMessage.contains(DATASOURCE_PREFIX + "foo"))
   }
 
   private def testSetLocation(isDatasourceTable: Boolean): Unit = {
@@ -1573,10 +1505,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     }
   }
 
-  test("create table with datasource properties (not allowed)") {
-    assertUnsupported("CREATE TABLE my_tab TBLPROPERTIES ('spark.sql.sources.me'='anything')")
-  }
-
   test("Create Hive Table As Select") {
     import testImplicits._
     withTable("t", "t1") {

http://git-wip-us.apache.org/repos/asf/spark/blob/b2074b66/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
index 49153f7..729c9fd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
@@ -201,7 +201,7 @@ class CreateTableAsSelectSuite
          """.stripMargin
       )
       val table = catalog.getTableMetadata(TableIdentifier("t"))
-      assert(DDLUtils.getPartitionColumnsFromTableProperties(table) == Seq("a"))
+      assert(table.partitionColumnNames == Seq("a"))
     }
   }
 
@@ -217,8 +217,7 @@ class CreateTableAsSelectSuite
          """.stripMargin
       )
       val table = catalog.getTableMetadata(TableIdentifier("t"))
-      assert(DDLUtils.getBucketSpecFromTableProperties(table) ==
-        Option(BucketSpec(5, Seq("a"), Seq("b"))))
+      assert(table.bucketSpec == Option(BucketSpec(5, Seq("a"), Seq("b"))))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b2074b66/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 8302e3e..de3e60a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -30,7 +30,11 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
 import org.apache.spark.sql.hive.client.HiveClient
+import org.apache.spark.sql.internal.HiveSerDe
+import org.apache.spark.sql.types.{DataType, StructType}
 
 
 /**
@@ -41,6 +45,8 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu
   extends ExternalCatalog with Logging {
 
   import CatalogTypes.TablePartitionSpec
+  import HiveExternalCatalog._
+  import CatalogTableType._
 
   // Exceptions thrown by the hive client that we would like to wrap
   private val clientExceptions = Set(
@@ -81,6 +87,20 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu
     withClient { getTable(db, table) }
   }
 
+  /**
+   * If the given table properties contains datasource properties, throw an exception. We will do
+   * this check when create or alter a table, i.e. when we try to write table metadata to Hive
+   * metastore.
+   */
+  private def verifyTableProperties(table: CatalogTable): Unit = {
+    val datasourceKeys = table.properties.keys.filter(_.startsWith(DATASOURCE_PREFIX))
+    if (datasourceKeys.nonEmpty) {
+      throw new AnalysisException(s"Cannot persistent ${table.qualifiedName} into hive metastore " +
+        s"as table property keys may not start with '$DATASOURCE_PREFIX': " +
+        datasourceKeys.mkString("[", ", ", "]"))
+    }
+  }
+
   // --------------------------------------------------------------------------
   // Databases
   // --------------------------------------------------------------------------
@@ -144,16 +164,162 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu
     assert(tableDefinition.identifier.database.isDefined)
     val db = tableDefinition.identifier.database.get
     requireDbExists(db)
+    verifyTableProperties(tableDefinition)
+
+    // Before saving data source table metadata into Hive metastore, we should:
+    //  1. Put table schema, partition column names and bucket specification in table properties.
+    //  2. Check if this table is hive compatible
+    //    2.1  If it's not hive compatible, set schema, partition columns and bucket spec to empty
+    //         and save table metadata to Hive.
+    //    2.1  If it's hive compatible, set serde information in table metadata and try to save
+    //         it to Hive. If it fails, treat it as not hive compatible and go back to 2.1
+    if (DDLUtils.isDatasourceTable(tableDefinition)) {
+      // data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`.
+      val provider = tableDefinition.provider.get
+      val partitionColumns = tableDefinition.partitionColumnNames
+      val bucketSpec = tableDefinition.bucketSpec
+
+      val tableProperties = new scala.collection.mutable.HashMap[String, String]
+      tableProperties.put(DATASOURCE_PROVIDER, provider)
+
+      // Serialized JSON schema string may be too long to be stored into a single metastore table
+      // property. In this case, we split the JSON string and store each part as a separate table
+      // property.
+      // TODO: the threshold should be set by `spark.sql.sources.schemaStringLengthThreshold`,
+      // however the current SQLConf is session isolated, which is not applicable to external
+      // catalog. We should re-enable this conf instead of hard code the value here, after we have
+      // global SQLConf.
+      val threshold = 4000
+      val schemaJsonString = tableDefinition.schema.json
+      // Split the JSON string.
+      val parts = schemaJsonString.grouped(threshold).toSeq
+      tableProperties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
+      parts.zipWithIndex.foreach { case (part, index) =>
+        tableProperties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part)
+      }
+
+      if (partitionColumns.nonEmpty) {
+        tableProperties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString)
+        partitionColumns.zipWithIndex.foreach { case (partCol, index) =>
+          tableProperties.put(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index", partCol)
+        }
+      }
+
+      if (bucketSpec.isDefined) {
+        val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get
+
+        tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETS, numBuckets.toString)
+        tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETCOLS, bucketColumnNames.length.toString)
+        bucketColumnNames.zipWithIndex.foreach { case (bucketCol, index) =>
+          tableProperties.put(s"$DATASOURCE_SCHEMA_BUCKETCOL_PREFIX$index", bucketCol)
+        }
+
+        if (sortColumnNames.nonEmpty) {
+          tableProperties.put(DATASOURCE_SCHEMA_NUMSORTCOLS, sortColumnNames.length.toString)
+          sortColumnNames.zipWithIndex.foreach { case (sortCol, index) =>
+            tableProperties.put(s"$DATASOURCE_SCHEMA_SORTCOL_PREFIX$index", sortCol)
+          }
+        }
+      }
+
+      // converts the table metadata to Spark SQL specific format, i.e. set schema, partition column
+      // names and bucket specification to empty.
+      def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
+        tableDefinition.copy(
+          schema = new StructType,
+          partitionColumnNames = Nil,
+          bucketSpec = None,
+          properties = tableDefinition.properties ++ tableProperties)
+      }
+
+      // converts the table metadata to Hive compatible format, i.e. set the serde information.
+      def newHiveCompatibleMetastoreTable(serde: HiveSerDe, path: String): CatalogTable = {
+        tableDefinition.copy(
+          storage = tableDefinition.storage.copy(
+            locationUri = Some(new Path(path).toUri.toString),
+            inputFormat = serde.inputFormat,
+            outputFormat = serde.outputFormat,
+            serde = serde.serde
+          ),
+          properties = tableDefinition.properties ++ tableProperties)
+      }
+
+      val qualifiedTableName = tableDefinition.identifier.quotedString
+      val maybeSerde = HiveSerDe.sourceToSerDe(tableDefinition.provider.get)
+      val maybePath = new CaseInsensitiveMap(tableDefinition.storage.properties).get("path")
+      val skipHiveMetadata = tableDefinition.storage.properties
+        .getOrElse("skipHiveMetadata", "false").toBoolean
+
+      val (hiveCompatibleTable, logMessage) = (maybeSerde, maybePath) match {
+        case _ if skipHiveMetadata =>
+          val message =
+            s"Persisting data source table $qualifiedTableName into Hive metastore in" +
+              "Spark SQL specific format, which is NOT compatible with Hive."
+          (None, message)
+
+        // our bucketing is un-compatible with hive(different hash function)
+        case _ if tableDefinition.bucketSpec.nonEmpty =>
+          val message =
+            s"Persisting bucketed data source table $qualifiedTableName into " +
+              "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. "
+          (None, message)
+
+        case (Some(serde), Some(path)) =>
+          val message =
+            s"Persisting file based data source table $qualifiedTableName with an input path " +
+              s"into Hive metastore in Hive compatible format."
+          (Some(newHiveCompatibleMetastoreTable(serde, path)), message)
+
+        case (Some(_), None) =>
+          val message =
+            s"Data source table $qualifiedTableName is not file based. Persisting it into " +
+              s"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive."
+          (None, message)
+
+        case _ =>
+          val provider = tableDefinition.provider.get
+          val message =
+            s"Couldn't find corresponding Hive SerDe for data source provider $provider. " +
+              s"Persisting data source table $qualifiedTableName into Hive metastore in " +
+              s"Spark SQL specific format, which is NOT compatible with Hive."
+          (None, message)
+      }
+
+      (hiveCompatibleTable, logMessage) match {
+        case (Some(table), message) =>
+          // We first try to save the metadata of the table in a Hive compatible way.
+          // If Hive throws an error, we fall back to save its metadata in the Spark SQL
+          // specific way.
+          try {
+            logInfo(message)
+            saveTableIntoHive(table, ignoreIfExists)
+          } catch {
+            case NonFatal(e) =>
+              val warningMessage =
+                s"Could not persist ${tableDefinition.identifier.quotedString} in a Hive " +
+                  "compatible way. Persisting it into Hive metastore in Spark SQL specific format."
+              logWarning(warningMessage, e)
+              saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists)
+          }
+
+        case (None, message) =>
+          logWarning(message)
+          saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists)
+      }
+    } else {
+      client.createTable(tableDefinition, ignoreIfExists)
+    }
+  }
 
-    if (
+  private def saveTableIntoHive(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
+    assert(DDLUtils.isDatasourceTable(tableDefinition),
+      "saveTableIntoHive only takes data source table.")
     // If this is an external data source table...
-      tableDefinition.properties.contains("spark.sql.sources.provider") &&
-        tableDefinition.tableType == CatalogTableType.EXTERNAL &&
-        // ... that is not persisted as Hive compatible format (external tables in Hive compatible
-        // format always set `locationUri` to the actual data location and should NOT be hacked as
-        // following.)
-        tableDefinition.storage.locationUri.isEmpty
-    ) {
+    if (tableDefinition.tableType == EXTERNAL &&
+      // ... that is not persisted as Hive compatible format (external tables in Hive compatible
+      // format always set `locationUri` to the actual data location and should NOT be hacked as
+      // following.)
+      tableDefinition.storage.locationUri.isEmpty) {
       // !! HACK ALERT !!
       //
       // Due to a restriction of Hive metastore, here we have to set `locationUri` to a temporary
@@ -200,22 +366,79 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu
    * Alter a table whose name that matches the one specified in `tableDefinition`,
    * assuming the table exists.
    *
-   * Note: As of now, this only supports altering table properties, serde properties,
-   * and num buckets!
+   * Note: As of now, this doesn't support altering table schema, partition column names and bucket
+   * specification. We will ignore them even if users do specify different values for these fields.
    */
   override def alterTable(tableDefinition: CatalogTable): Unit = withClient {
     assert(tableDefinition.identifier.database.isDefined)
     val db = tableDefinition.identifier.database.get
     requireTableExists(db, tableDefinition.identifier.table)
-    client.alterTable(tableDefinition)
+    verifyTableProperties(tableDefinition)
+
+    if (DDLUtils.isDatasourceTable(tableDefinition)) {
+      val oldDef = client.getTable(db, tableDefinition.identifier.table)
+      // Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition,
+      // to retain the spark specific format if it is. Also add old data source properties to table
+      // properties, to retain the data source table format.
+      val oldDataSourceProps = oldDef.properties.filter(_._1.startsWith(DATASOURCE_PREFIX))
+      val newDef = tableDefinition.copy(
+        schema = oldDef.schema,
+        partitionColumnNames = oldDef.partitionColumnNames,
+        bucketSpec = oldDef.bucketSpec,
+        properties = oldDataSourceProps ++ tableDefinition.properties)
+
+      client.alterTable(newDef)
+    } else {
+      client.alterTable(tableDefinition)
+    }
   }
 
   override def getTable(db: String, table: String): CatalogTable = withClient {
-    client.getTable(db, table)
+    restoreTableMetadata(client.getTable(db, table))
   }
 
   override def getTableOption(db: String, table: String): Option[CatalogTable] = withClient {
-    client.getTableOption(db, table)
+    client.getTableOption(db, table).map(restoreTableMetadata)
+  }
+
+  /**
+   * Restores table metadata from the table properties if it's a datasouce table. This method is
+   * kind of a opposite version of [[createTable]].
+   *
+   * It reads table schema, provider, partition column names and bucket specification from table
+   * properties, and filter out these special entries from table properties.
+   */
+  private def restoreTableMetadata(table: CatalogTable): CatalogTable = {
+    if (table.tableType == VIEW) {
+      table
+    } else {
+      getProviderFromTableProperties(table).map { provider =>
+        assert(provider != "hive", "Hive serde table should not save provider in table properties.")
+        // SPARK-15269: Persisted data source tables always store the location URI as a storage
+        // property named "path" instead of standard Hive `dataLocation`, because Hive only
+        // allows directory paths as location URIs while Spark SQL data source tables also
+        // allows file paths. So the standard Hive `dataLocation` is meaningless for Spark SQL
+        // data source tables.
+        // Spark SQL may also save external data source in Hive compatible format when
+        // possible, so that these tables can be directly accessed by Hive. For these tables,
+        // `dataLocation` is still necessary. Here we also check for input format because only
+        // these Hive compatible tables set this field.
+        val storage = if (table.tableType == EXTERNAL && table.storage.inputFormat.isEmpty) {
+          table.storage.copy(locationUri = None)
+        } else {
+          table.storage
+        }
+        table.copy(
+          storage = storage,
+          schema = getSchemaFromTableProperties(table),
+          provider = Some(provider),
+          partitionColumnNames = getPartitionColumnsFromTableProperties(table),
+          bucketSpec = getBucketSpecFromTableProperties(table),
+          properties = getOriginalTableProperties(table))
+      } getOrElse {
+        table.copy(provider = Some("hive"))
+      }
+    }
   }
 
   override def tableExists(db: String, table: String): Boolean = withClient {
@@ -363,3 +586,82 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu
   }
 
 }
+
+object HiveExternalCatalog {
+  val DATASOURCE_PREFIX = "spark.sql.sources."
+  val DATASOURCE_PROVIDER = DATASOURCE_PREFIX + "provider"
+  val DATASOURCE_SCHEMA = DATASOURCE_PREFIX + "schema"
+  val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_SCHEMA + "."
+  val DATASOURCE_SCHEMA_NUMPARTS = DATASOURCE_SCHEMA_PREFIX + "numParts"
+  val DATASOURCE_SCHEMA_NUMPARTCOLS = DATASOURCE_SCHEMA_PREFIX + "numPartCols"
+  val DATASOURCE_SCHEMA_NUMSORTCOLS = DATASOURCE_SCHEMA_PREFIX + "numSortCols"
+  val DATASOURCE_SCHEMA_NUMBUCKETS = DATASOURCE_SCHEMA_PREFIX + "numBuckets"
+  val DATASOURCE_SCHEMA_NUMBUCKETCOLS = DATASOURCE_SCHEMA_PREFIX + "numBucketCols"
+  val DATASOURCE_SCHEMA_PART_PREFIX = DATASOURCE_SCHEMA_PREFIX + "part."
+  val DATASOURCE_SCHEMA_PARTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "partCol."
+  val DATASOURCE_SCHEMA_BUCKETCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "bucketCol."
+  val DATASOURCE_SCHEMA_SORTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "sortCol."
+
+  def getProviderFromTableProperties(metadata: CatalogTable): Option[String] = {
+    metadata.properties.get(DATASOURCE_PROVIDER)
+  }
+
+  def getOriginalTableProperties(metadata: CatalogTable): Map[String, String] = {
+    metadata.properties.filterNot { case (key, _) => key.startsWith(DATASOURCE_PREFIX) }
+  }
+
+  // A persisted data source table always store its schema in the catalog.
+  def getSchemaFromTableProperties(metadata: CatalogTable): StructType = {
+    val errorMessage = "Could not read schema from the hive metastore because it is corrupted."
+    val props = metadata.properties
+    props.get(DATASOURCE_SCHEMA).map { schema =>
+      // Originally, we used `spark.sql.sources.schema` to store the schema of a data source table.
+      // After SPARK-6024, we removed this flag.
+      // Although we are not using `spark.sql.sources.schema` any more, we need to still support.
+      DataType.fromJson(schema).asInstanceOf[StructType]
+    } getOrElse {
+      props.get(DATASOURCE_SCHEMA_NUMPARTS).map { numParts =>
+        val parts = (0 until numParts.toInt).map { index =>
+          val part = metadata.properties.get(s"$DATASOURCE_SCHEMA_PART_PREFIX$index").orNull
+          if (part == null) {
+            throw new AnalysisException(errorMessage +
+              s" (missing part $index of the schema, $numParts parts are expected).")
+          }
+          part
+        }
+        // Stick all parts back to a single schema string.
+        DataType.fromJson(parts.mkString).asInstanceOf[StructType]
+      } getOrElse {
+        throw new AnalysisException(errorMessage)
+      }
+    }
+  }
+
+  private def getColumnNamesByType(
+      props: Map[String, String],
+      colType: String,
+      typeName: String): Seq[String] = {
+    for {
+      numCols <- props.get(s"spark.sql.sources.schema.num${colType.capitalize}Cols").toSeq
+      index <- 0 until numCols.toInt
+    } yield props.getOrElse(
+      s"$DATASOURCE_SCHEMA_PREFIX${colType}Col.$index",
+      throw new AnalysisException(
+        s"Corrupted $typeName in catalog: $numCols parts expected, but part $index is missing."
+      )
+    )
+  }
+
+  def getPartitionColumnsFromTableProperties(metadata: CatalogTable): Seq[String] = {
+    getColumnNamesByType(metadata.properties, "part", "partitioning columns")
+  }
+
+  def getBucketSpecFromTableProperties(metadata: CatalogTable): Option[BucketSpec] = {
+    metadata.properties.get(DATASOURCE_SCHEMA_NUMBUCKETS).map { numBuckets =>
+      BucketSpec(
+        numBuckets.toInt,
+        getColumnNamesByType(metadata.properties, "bucket", "bucketing columns"),
+        getColumnNamesByType(metadata.properties, "sort", "sorting columns"))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/b2074b66/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 7118eda..181f470 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
+import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.{Partition => _, _}
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
 import org.apache.spark.sql.hive.orc.OrcFileFormat
@@ -68,64 +68,16 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
     val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() {
       override def load(in: QualifiedTableName): LogicalPlan = {
         logDebug(s"Creating new cached data source for $in")
-        val table = client.getTable(in.database, in.name)
+        val table = sparkSession.sharedState.externalCatalog.getTable(in.database, in.name)
 
-        // TODO: the following code is duplicated with FindDataSourceTable.readDataSourceTable
-
-        def schemaStringFromParts: Option[String] = {
-          table.properties.get(DATASOURCE_SCHEMA_NUMPARTS).map { numParts =>
-            val parts = (0 until numParts.toInt).map { index =>
-              val part = table.properties.get(s"$DATASOURCE_SCHEMA_PART_PREFIX$index").orNull
-              if (part == null) {
-                throw new AnalysisException(
-                  "Could not read schema from the metastore because it is corrupted " +
-                    s"(missing part $index of the schema, $numParts parts are expected).")
-              }
-
-              part
-            }
-            // Stick all parts back to a single schema string.
-            parts.mkString
-          }
-        }
-
-        def getColumnNames(colType: String): Seq[String] = {
-          table.properties.get(s"$DATASOURCE_SCHEMA.num${colType.capitalize}Cols").map {
-            numCols => (0 until numCols.toInt).map { index =>
-              table.properties.getOrElse(s"$DATASOURCE_SCHEMA_PREFIX${colType}Col.$index",
-                throw new AnalysisException(
-                  s"Could not read $colType columns from the metastore because it is corrupted " +
-                    s"(missing part $index of it, $numCols parts are expected)."))
-            }
-          }.getOrElse(Nil)
-        }
-
-        // Originally, we used spark.sql.sources.schema to store the schema of a data source table.
-        // After SPARK-6024, we removed this flag.
-        // Although we are not using spark.sql.sources.schema any more, we need to still support.
-        val schemaString = table.properties.get(DATASOURCE_SCHEMA).orElse(schemaStringFromParts)
-
-        val userSpecifiedSchema =
-          schemaString.map(s => DataType.fromJson(s).asInstanceOf[StructType])
-
-        // We only need names at here since userSpecifiedSchema we loaded from the metastore
-        // contains partition columns. We can always get data types of partitioning columns
-        // from userSpecifiedSchema.
-        val partitionColumns = getColumnNames("part")
-
-        val bucketSpec = table.properties.get(DATASOURCE_SCHEMA_NUMBUCKETS).map { n =>
-          BucketSpec(n.toInt, getColumnNames("bucket"), getColumnNames("sort"))
-        }
-
-        val options = table.storage.properties
         val dataSource =
           DataSource(
             sparkSession,
-            userSpecifiedSchema = userSpecifiedSchema,
-            partitionColumns = partitionColumns,
-            bucketSpec = bucketSpec,
-            className = table.properties(DATASOURCE_PROVIDER),
-            options = options)
+            userSpecifiedSchema = Some(table.schema),
+            partitionColumns = table.partitionColumnNames,
+            bucketSpec = table.bucketSpec,
+            className = table.provider.get,
+            options = table.storage.properties)
 
         LogicalRelation(
           dataSource.resolveRelation(checkPathExist = true),
@@ -158,9 +110,10 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
       tableIdent: TableIdentifier,
       alias: Option[String]): LogicalPlan = {
     val qualifiedTableName = getQualifiedTableName(tableIdent)
-    val table = client.getTable(qualifiedTableName.database, qualifiedTableName.name)
+    val table = sparkSession.sharedState.externalCatalog.getTable(
+      qualifiedTableName.database, qualifiedTableName.name)
 
-    if (table.properties.get(DATASOURCE_PROVIDER).isDefined) {
+    if (DDLUtils.isDatasourceTable(table)) {
       val dataSourceTable = cachedDataSourceTables(qualifiedTableName)
       val qualifiedTable = SubqueryAlias(qualifiedTableName.name, dataSourceTable, None)
       // Then, if alias is specified, wrap the table with a Subquery using the alias.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org