You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/02/06 05:30:17 UTC

spark git commit: [SPARK-19279][SQL] Infer Schema for Hive Serde Tables and Block Creating a Hive Table With an Empty Schema

Repository: spark
Updated Branches:
  refs/heads/master 317fa7508 -> 65b10ffb3


[SPARK-19279][SQL] Infer Schema for Hive Serde Tables and Block Creating a Hive Table With an Empty Schema

### What changes were proposed in this pull request?
So far, we allow users to create a table with an empty schema: `CREATE TABLE tab1`. This could break many code paths if we enable it. Thus, we should follow Hive to block it.

For Hive serde tables, some serde libraries require the specified schema and record it in the metastore. To get the list, we need to check `hive.serdes.using.metastore.for.schema,` which contains a list of serdes that require user-specified schema. The default values are

- org.apache.hadoop.hive.ql.io.orc.OrcSerde
- org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe
- org.apache.hadoop.hive.serde2.dynamic_type.DynamicSerDe
- org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe
- org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe
- org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
- org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe

### How was this patch tested?
Added test cases for both Hive and data source tables

Author: gatorsmile <ga...@gmail.com>

Closes #16636 from gatorsmile/fixEmptyTableSchema.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/65b10ffb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/65b10ffb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/65b10ffb

Branch: refs/heads/master
Commit: 65b10ffb3883cfed5b182db20b55a52ee0d89cba
Parents: 317fa75
Author: gatorsmile <ga...@gmail.com>
Authored: Mon Feb 6 13:30:07 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Mon Feb 6 13:30:07 2017 +0800

----------------------------------------------------------------------
 .../ml/source/libsvm/LibSVMRelationSuite.scala  |  27 ++++
 .../spark/sql/execution/datasources/rules.scala |   4 +-
 .../spark/sql/internal/SessionState.scala       |   2 +-
 .../spark/sql/execution/command/DDLSuite.scala  |  15 ++
 .../spark/sql/hive/HiveSessionState.scala       |   6 +-
 .../apache/spark/sql/hive/HiveStrategies.scala  |  59 +++++---
 .../org/apache/spark/sql/hive/HiveUtils.scala   | 138 ++++++++++++++++++-
 .../spark/sql/hive/MetastoreRelation.scala      |  89 +-----------
 .../spark/sql/hive/QueryPartitionSuite.scala    |  78 -----------
 .../spark/sql/hive/client/VersionsSuite.scala   |  84 ++++++++++-
 .../spark/sql/hive/execution/HiveDDLSuite.scala |  23 +++-
 11 files changed, 331 insertions(+), 194 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/65b10ffb/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala
index c701f38..478a83f 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala
@@ -110,4 +110,31 @@ class LibSVMRelationSuite extends SparkFunSuite with MLlibTestSparkContext {
     df.select("features").rdd.map { case Row(d: Vector) => d }.first
     df.select("features").collect
   }
+
+  test("create libsvmTable table without schema") {
+    try {
+      spark.sql(
+        s"""
+           |CREATE TABLE libsvmTable
+           |USING libsvm
+           |OPTIONS (
+           |  path '$path'
+           |)
+         """.stripMargin)
+      val df = spark.table("libsvmTable")
+      assert(df.columns(0) == "label")
+      assert(df.columns(1) == "features")
+    } finally {
+      spark.sql("DROP TABLE IF EXISTS libsvmTable")
+    }
+  }
+
+  test("create libsvmTable table without schema and path") {
+    try {
+      val e = intercept[IOException](spark.sql("CREATE TABLE libsvmTable USING libsvm"))
+      assert(e.getMessage.contains("No input path specified for libsvm data"))
+    } finally {
+      spark.sql("DROP TABLE IF EXISTS libsvmTable")
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/65b10ffb/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index d553d44..623d47b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -30,9 +30,9 @@ import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation}
 import org.apache.spark.sql.types.{AtomicType, StructType}
 
 /**
- * Try to replaces [[UnresolvedRelation]]s with [[ResolveDataSource]].
+ * Try to replaces [[UnresolvedRelation]]s if the plan is for direct query on files.
  */
-class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] {
+class ResolveSQLOnFile(sparkSession: SparkSession) extends Rule[LogicalPlan] {
   private def maybeSQLFile(u: UnresolvedRelation): Boolean = {
     sparkSession.sessionState.conf.runSQLonFile && u.tableIdentifier.database.isDefined
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/65b10ffb/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index 68b774b..a5ebe47 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -115,7 +115,7 @@ private[sql] class SessionState(sparkSession: SparkSession) {
     new Analyzer(catalog, conf) {
       override val extendedResolutionRules =
         new FindDataSourceTable(sparkSession) ::
-        new ResolveDataSource(sparkSession) :: Nil
+        new ResolveSQLOnFile(sparkSession) :: Nil
 
       override val postHocResolutionRules =
         AnalyzeCreateTable(sparkSession) ::

http://git-wip-us.apache.org/repos/asf/spark/blob/65b10ffb/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 51f5946..f6d1ee2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -1511,6 +1511,21 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     }
   }
 
+  test("create a data source table without schema") {
+    import testImplicits._
+    withTempPath { tempDir =>
+      withTable("tab1", "tab2") {
+        (("a", "b") :: Nil).toDF().write.json(tempDir.getCanonicalPath)
+
+        val e = intercept[AnalysisException] { sql("CREATE TABLE tab1 USING json") }.getMessage
+        assert(e.contains("Unable to infer schema for JSON. It must be specified manually"))
+
+        sql(s"CREATE TABLE tab2 using json location '${tempDir.getCanonicalPath}'")
+        checkAnswer(spark.table("tab2"), Row("a", "b"))
+      }
+    }
+  }
+
   test("create table using CLUSTERED BY without schema specification") {
     import testImplicits._
     withTempPath { tempDir =>

http://git-wip-us.apache.org/repos/asf/spark/blob/65b10ffb/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 9fd03ef..413712e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -62,16 +62,16 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
       override val extendedResolutionRules =
         catalog.ParquetConversions ::
         catalog.OrcConversions ::
-        new DetermineHiveSerde(conf) ::
+        new ResolveHiveSerdeTable(sparkSession) ::
         new FindDataSourceTable(sparkSession) ::
         new FindHiveSerdeTable(sparkSession) ::
-        new ResolveDataSource(sparkSession) :: Nil
+        new ResolveSQLOnFile(sparkSession) :: Nil
 
       override val postHocResolutionRules =
         AnalyzeCreateTable(sparkSession) ::
         PreprocessTableInsertion(conf) ::
         DataSourceAnalysis(conf) ::
-        new HiveAnalysis(sparkSession) :: Nil
+        HiveAnalysis :: Nil
 
       override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog))
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/65b10ffb/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 39be417..0f293c2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.hive
 
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, SimpleCatalogRelation}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, SimpleCatalogRelation}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, ScriptTransformation}
@@ -27,21 +27,24 @@ import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.{CreateTable, PreprocessTableInsertion}
 import org.apache.spark.sql.hive.execution._
-import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
+import org.apache.spark.sql.internal.HiveSerDe
 
 
 /**
- * Determine the serde/format of the Hive serde table, according to the storage properties.
+ * Determine the database, serde/format and schema of the Hive serde table, according to the storage
+ * properties.
  */
-class DetermineHiveSerde(conf: SQLConf) extends Rule[LogicalPlan] {
-  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-    case c @ CreateTable(t, _, query) if DDLUtils.isHiveTable(t) && t.storage.serde.isEmpty =>
-      if (t.bucketSpec.isDefined) {
+class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {
+  private def determineHiveSerde(table: CatalogTable): CatalogTable = {
+    if (table.storage.serde.nonEmpty) {
+      table
+    } else {
+      if (table.bucketSpec.isDefined) {
         throw new AnalysisException("Creating bucketed Hive serde table is not supported yet.")
       }
 
-      val defaultStorage = HiveSerDe.getDefaultStorage(conf)
-      val options = new HiveOptions(t.storage.properties)
+      val defaultStorage = HiveSerDe.getDefaultStorage(session.sessionState.conf)
+      val options = new HiveOptions(table.storage.properties)
 
       val fileStorage = if (options.fileFormat.isDefined) {
         HiveSerDe.sourceToSerDe(options.fileFormat.get) match {
@@ -67,13 +70,39 @@ class DetermineHiveSerde(conf: SQLConf) extends Rule[LogicalPlan] {
         CatalogStorageFormat.empty
       }
 
-      val storage = t.storage.copy(
+      val storage = table.storage.copy(
         inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat),
         outputFormat = fileStorage.outputFormat.orElse(defaultStorage.outputFormat),
         serde = rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde),
         properties = options.serdeProperties)
 
-      c.copy(tableDesc = t.copy(storage = storage))
+      table.copy(storage = storage)
+    }
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+    case c @ CreateTable(t, _, query) if DDLUtils.isHiveTable(t) =>
+      // Finds the database name if the name does not exist.
+      val dbName = t.identifier.database.getOrElse(session.catalog.currentDatabase)
+      val table = t.copy(identifier = t.identifier.copy(database = Some(dbName)))
+
+      // Determines the serde/format of Hive tables
+      val withStorage = determineHiveSerde(table)
+
+      // Infers the schema, if empty, because the schema could be determined by Hive
+      // serde.
+      val catalogTable = if (query.isEmpty) {
+        val withSchema = HiveUtils.inferSchema(withStorage)
+        if (withSchema.schema.length <= 0) {
+          throw new AnalysisException("Unable to infer the schema. " +
+            s"The schema specification is required to create the table ${withSchema.identifier}.")
+        }
+        withSchema
+      } else {
+        withStorage
+      }
+
+      c.copy(tableDesc = catalogTable)
   }
 }
 
@@ -82,17 +111,13 @@ class DetermineHiveSerde(conf: SQLConf) extends Rule[LogicalPlan] {
  *
  * Note that, this rule must be run after `PreprocessTableInsertion`.
  */
-class HiveAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
+object HiveAnalysis extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     case InsertIntoTable(table: MetastoreRelation, partSpec, query, overwrite, ifNotExists) =>
       InsertIntoHiveTable(table, partSpec, query, overwrite, ifNotExists)
 
     case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) =>
-      val dbName = tableDesc.identifier.database.getOrElse(session.catalog.currentDatabase)
-      CreateHiveTableAsSelectCommand(
-        tableDesc.copy(identifier = tableDesc.identifier.copy(database = Some(dbName))),
-        query,
-        mode)
+      CreateHiveTableAsSelectCommand(tableDesc, query, mode)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/65b10ffb/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index 26b1994..2822a55 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -24,18 +24,25 @@ import java.sql.Timestamp
 import java.util.concurrent.TimeUnit
 
 import scala.collection.mutable.HashMap
+import scala.collection.JavaConverters._
 import scala.language.implicitConversions
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hive.common.`type`.HiveDecimal
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
+import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
+import org.apache.hadoop.hive.metastore.api.FieldSchema
+import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
 import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
 import org.apache.hadoop.util.VersionInfo
 
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.{SparkConf, SparkContext, SparkException}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition, CatalogTableType}
+import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
+import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.hive.client._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf._
@@ -455,4 +462,133 @@ private[spark] object HiveUtils extends Logging {
     case (decimal, DecimalType()) => decimal.toString
     case (other, tpe) if primitiveTypes contains tpe => other.toString
   }
+
+  /** Converts the native StructField to Hive's FieldSchema. */
+  private def toHiveColumn(c: StructField): FieldSchema = {
+    val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) {
+      c.metadata.getString(HiveUtils.hiveTypeString)
+    } else {
+      c.dataType.catalogString
+    }
+    new FieldSchema(c.name, typeString, c.getComment.orNull)
+  }
+
+  /** Builds the native StructField from Hive's FieldSchema. */
+  private def fromHiveColumn(hc: FieldSchema): StructField = {
+    val columnType = try {
+      CatalystSqlParser.parseDataType(hc.getType)
+    } catch {
+      case e: ParseException =>
+        throw new SparkException("Cannot recognize hive type string: " + hc.getType, e)
+    }
+
+    val metadata = new MetadataBuilder().putString(HiveUtils.hiveTypeString, hc.getType).build()
+    val field = StructField(
+      name = hc.getName,
+      dataType = columnType,
+      nullable = true,
+      metadata = metadata)
+    Option(hc.getComment).map(field.withComment).getOrElse(field)
+  }
+
+  // TODO: merge this with HiveClientImpl#toHiveTable
+  /** Converts the native table metadata representation format CatalogTable to Hive's Table. */
+  def toHiveTable(catalogTable: CatalogTable): HiveTable = {
+    // We start by constructing an API table as Hive performs several important transformations
+    // internally when converting an API table to a QL table.
+    val tTable = new org.apache.hadoop.hive.metastore.api.Table()
+    tTable.setTableName(catalogTable.identifier.table)
+    tTable.setDbName(catalogTable.database)
+
+    val tableParameters = new java.util.HashMap[String, String]()
+    tTable.setParameters(tableParameters)
+    catalogTable.properties.foreach { case (k, v) => tableParameters.put(k, v) }
+
+    tTable.setTableType(catalogTable.tableType match {
+      case CatalogTableType.EXTERNAL => HiveTableType.EXTERNAL_TABLE.toString
+      case CatalogTableType.MANAGED => HiveTableType.MANAGED_TABLE.toString
+      case CatalogTableType.VIEW => HiveTableType.VIRTUAL_VIEW.toString
+    })
+
+    val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
+    tTable.setSd(sd)
+
+    // Note: In Hive the schema and partition columns must be disjoint sets
+    val (partCols, schema) = catalogTable.schema.map(toHiveColumn).partition { c =>
+      catalogTable.partitionColumnNames.contains(c.getName)
+    }
+    sd.setCols(schema.asJava)
+    tTable.setPartitionKeys(partCols.asJava)
+
+    catalogTable.storage.locationUri.foreach(sd.setLocation)
+    catalogTable.storage.inputFormat.foreach(sd.setInputFormat)
+    catalogTable.storage.outputFormat.foreach(sd.setOutputFormat)
+
+    val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
+    catalogTable.storage.serde.foreach(serdeInfo.setSerializationLib)
+    sd.setSerdeInfo(serdeInfo)
+
+    val serdeParameters = new java.util.HashMap[String, String]()
+    catalogTable.storage.properties.foreach { case (k, v) => serdeParameters.put(k, v) }
+    serdeInfo.setParameters(serdeParameters)
+
+    new HiveTable(tTable)
+  }
+
+  /**
+   * Converts the native partition metadata representation format CatalogTablePartition to
+   * Hive's Partition.
+   */
+  def toHivePartition(
+      catalogTable: CatalogTable,
+      hiveTable: HiveTable,
+      partition: CatalogTablePartition): HivePartition = {
+    val tPartition = new org.apache.hadoop.hive.metastore.api.Partition
+    tPartition.setDbName(catalogTable.database)
+    tPartition.setTableName(catalogTable.identifier.table)
+    tPartition.setValues(catalogTable.partitionColumnNames.map(partition.spec(_)).asJava)
+
+    val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
+    tPartition.setSd(sd)
+
+    // Note: In Hive the schema and partition columns must be disjoint sets
+    val schema = catalogTable.schema.map(toHiveColumn).filter { c =>
+      !catalogTable.partitionColumnNames.contains(c.getName)
+    }
+    sd.setCols(schema.asJava)
+
+    partition.storage.locationUri.foreach(sd.setLocation)
+    partition.storage.inputFormat.foreach(sd.setInputFormat)
+    partition.storage.outputFormat.foreach(sd.setOutputFormat)
+
+    val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
+    sd.setSerdeInfo(serdeInfo)
+    // maps and lists should be set only after all elements are ready (see HIVE-7975)
+    partition.storage.serde.foreach(serdeInfo.setSerializationLib)
+
+    val serdeParameters = new java.util.HashMap[String, String]()
+    catalogTable.storage.properties.foreach { case (k, v) => serdeParameters.put(k, v) }
+    partition.storage.properties.foreach { case (k, v) => serdeParameters.put(k, v) }
+    serdeInfo.setParameters(serdeParameters)
+
+    new HivePartition(hiveTable, tPartition)
+  }
+
+  /**
+   * Infers the schema for Hive serde tables and returns the CatalogTable with the inferred schema.
+   * When the tables are data source tables or the schema already exists, returns the original
+   * CatalogTable.
+   */
+  def inferSchema(table: CatalogTable): CatalogTable = {
+    if (DDLUtils.isDatasourceTable(table) || table.schema.nonEmpty) {
+      table
+    } else {
+      val hiveTable = toHiveTable(table)
+      // Note: Hive separates partition columns and the schema, but for us the
+      // partition columns are part of the schema
+      val partCols = hiveTable.getPartCols.asScala.map(fromHiveColumn)
+      val schema = StructType(hiveTable.getCols.asScala.map(fromHiveColumn) ++ partCols)
+      table.copy(schema = schema)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/65b10ffb/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
index 346757c..6394eb6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
@@ -19,13 +19,9 @@ package org.apache.spark.sql.hive
 
 import java.io.IOException
 
-import scala.collection.JavaConverters._
-
 import com.google.common.base.Objects
 import org.apache.hadoop.fs.FileSystem
 import org.apache.hadoop.hive.common.StatsSetupConst
-import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
-import org.apache.hadoop.hive.metastore.api.FieldSchema
 import org.apache.hadoop.hive.ql.metadata.{Partition, Table => HiveTable}
 import org.apache.hadoop.hive.ql.plan.TableDesc
 
@@ -60,57 +56,7 @@ private[hive] case class MetastoreRelation(
 
   override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sparkSession :: Nil
 
-  private def toHiveColumn(c: StructField): FieldSchema = {
-    val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) {
-      c.metadata.getString(HiveUtils.hiveTypeString)
-    } else {
-      c.dataType.catalogString
-    }
-    new FieldSchema(c.name, typeString, c.getComment.orNull)
-  }
-
-  // TODO: merge this with HiveClientImpl#toHiveTable
-  @transient val hiveQlTable: HiveTable = {
-    // We start by constructing an API table as Hive performs several important transformations
-    // internally when converting an API table to a QL table.
-    val tTable = new org.apache.hadoop.hive.metastore.api.Table()
-    tTable.setTableName(catalogTable.identifier.table)
-    tTable.setDbName(catalogTable.database)
-
-    val tableParameters = new java.util.HashMap[String, String]()
-    tTable.setParameters(tableParameters)
-    catalogTable.properties.foreach { case (k, v) => tableParameters.put(k, v) }
-
-    tTable.setTableType(catalogTable.tableType match {
-      case CatalogTableType.EXTERNAL => HiveTableType.EXTERNAL_TABLE.toString
-      case CatalogTableType.MANAGED => HiveTableType.MANAGED_TABLE.toString
-      case CatalogTableType.VIEW => HiveTableType.VIRTUAL_VIEW.toString
-    })
-
-    val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
-    tTable.setSd(sd)
-
-    // Note: In Hive the schema and partition columns must be disjoint sets
-    val (partCols, schema) = catalogTable.schema.map(toHiveColumn).partition { c =>
-      catalogTable.partitionColumnNames.contains(c.getName)
-    }
-    sd.setCols(schema.asJava)
-    tTable.setPartitionKeys(partCols.asJava)
-
-    catalogTable.storage.locationUri.foreach(sd.setLocation)
-    catalogTable.storage.inputFormat.foreach(sd.setInputFormat)
-    catalogTable.storage.outputFormat.foreach(sd.setOutputFormat)
-
-    val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
-    catalogTable.storage.serde.foreach(serdeInfo.setSerializationLib)
-    sd.setSerdeInfo(serdeInfo)
-
-    val serdeParameters = new java.util.HashMap[String, String]()
-    catalogTable.storage.properties.foreach { case (k, v) => serdeParameters.put(k, v) }
-    serdeInfo.setParameters(serdeParameters)
-
-    new HiveTable(tTable)
-  }
+  @transient val hiveQlTable: HiveTable = HiveUtils.toHiveTable(catalogTable)
 
   @transient override def computeStats(conf: CatalystConf): Statistics = {
     catalogTable.stats.map(_.toPlanStats(output)).getOrElse(Statistics(
@@ -165,38 +111,7 @@ private[hive] case class MetastoreRelation(
     } else {
       allPartitions
     }
-
-    rawPartitions.map { p =>
-      val tPartition = new org.apache.hadoop.hive.metastore.api.Partition
-      tPartition.setDbName(databaseName)
-      tPartition.setTableName(tableName)
-      tPartition.setValues(partitionKeys.map(a => p.spec(a.name)).asJava)
-
-      val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
-      tPartition.setSd(sd)
-
-      // Note: In Hive the schema and partition columns must be disjoint sets
-      val schema = catalogTable.schema.map(toHiveColumn).filter { c =>
-        !catalogTable.partitionColumnNames.contains(c.getName)
-      }
-      sd.setCols(schema.asJava)
-
-      p.storage.locationUri.foreach(sd.setLocation)
-      p.storage.inputFormat.foreach(sd.setInputFormat)
-      p.storage.outputFormat.foreach(sd.setOutputFormat)
-
-      val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
-      sd.setSerdeInfo(serdeInfo)
-      // maps and lists should be set only after all elements are ready (see HIVE-7975)
-      p.storage.serde.foreach(serdeInfo.setSerializationLib)
-
-      val serdeParameters = new java.util.HashMap[String, String]()
-      catalogTable.storage.properties.foreach { case (k, v) => serdeParameters.put(k, v) }
-      p.storage.properties.foreach { case (k, v) => serdeParameters.put(k, v) }
-      serdeInfo.setParameters(serdeParameters)
-
-      new Partition(hiveQlTable, tPartition)
-    }
+    rawPartitions.map(HiveUtils.toHivePartition(catalogTable, hiveQlTable, _))
   }
 
   /** Only compare database and tablename, not alias. */

http://git-wip-us.apache.org/repos/asf/spark/blob/65b10ffb/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
index b20c10c..43b6bf5 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
@@ -68,82 +68,4 @@ class QueryPartitionSuite extends QueryTest with SQLTestUtils with TestHiveSingl
       sql("DROP TABLE IF EXISTS createAndInsertTest")
     }
   }
-
-  test("SPARK-13709: reading partitioned Avro table with nested schema") {
-    withTempDir { dir =>
-      val path = dir.toURI.toString
-      val tableName = "spark_13709"
-      val tempTableName = "spark_13709_temp"
-
-      new File(dir.getAbsolutePath, tableName).mkdir()
-      new File(dir.getAbsolutePath, tempTableName).mkdir()
-
-      val avroSchema =
-        """{
-          |  "name": "test_record",
-          |  "type": "record",
-          |  "fields": [ {
-          |    "name": "f0",
-          |    "type": "int"
-          |  }, {
-          |    "name": "f1",
-          |    "type": {
-          |      "type": "record",
-          |      "name": "inner",
-          |      "fields": [ {
-          |        "name": "f10",
-          |        "type": "int"
-          |      }, {
-          |        "name": "f11",
-          |        "type": "double"
-          |      } ]
-          |    }
-          |  } ]
-          |}
-        """.stripMargin
-
-      withTable(tableName, tempTableName) {
-        // Creates the external partitioned Avro table to be tested.
-        sql(
-          s"""CREATE EXTERNAL TABLE $tableName
-             |PARTITIONED BY (ds STRING)
-             |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
-             |STORED AS
-             |  INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
-             |  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
-             |LOCATION '$path/$tableName'
-             |TBLPROPERTIES ('avro.schema.literal' = '$avroSchema')
-           """.stripMargin
-        )
-
-        // Creates an temporary Avro table used to prepare testing Avro file.
-        sql(
-          s"""CREATE EXTERNAL TABLE $tempTableName
-             |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
-             |STORED AS
-             |  INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
-             |  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
-             |LOCATION '$path/$tempTableName'
-             |TBLPROPERTIES ('avro.schema.literal' = '$avroSchema')
-           """.stripMargin
-        )
-
-        // Generates Avro data.
-        sql(s"INSERT OVERWRITE TABLE $tempTableName SELECT 1, STRUCT(2, 2.5)")
-
-        // Adds generated Avro data as a new partition to the testing table.
-        sql(s"ALTER TABLE $tableName ADD PARTITION (ds = 'foo') LOCATION '$path/$tempTableName'")
-
-        // The following query fails before SPARK-13709 is fixed. This is because when reading data
-        // from table partitions, Avro deserializer needs the Avro schema, which is defined in
-        // table property "avro.schema.literal". However, we only initializes the deserializer using
-        // partition properties, which doesn't include the wanted property entry. Merging two sets
-        // of properties solves the problem.
-        checkAnswer(
-          sql(s"SELECT * FROM $tableName"),
-          Row(1, Row(2, 2.5D), "foo")
-        )
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/65b10ffb/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 28b5bfd..ca39c7e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -24,9 +24,8 @@ import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
 import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
 import org.apache.hadoop.mapred.TextInputFormat
 
-import org.apache.spark.SparkFunSuite
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPermanentFunctionException}
 import org.apache.spark.sql.catalyst.catalog._
@@ -47,7 +46,7 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils}
  * is not fully tested.
  */
 @ExtendedHiveTest
-class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSingleton with Logging {
+class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton with Logging {
 
   private val clientBuilder = new HiveClientBuilder
   import clientBuilder.buildClient
@@ -571,6 +570,85 @@ class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSinglet
       }
     }
 
+
+    test(s"$version: SPARK-13709: reading partitioned Avro table with nested schema") {
+      withTempDir { dir =>
+        val path = dir.toURI.toString
+        val tableName = "spark_13709"
+        val tempTableName = "spark_13709_temp"
+
+        new File(dir.getAbsolutePath, tableName).mkdir()
+        new File(dir.getAbsolutePath, tempTableName).mkdir()
+
+        val avroSchema =
+          """{
+            |  "name": "test_record",
+            |  "type": "record",
+            |  "fields": [ {
+            |    "name": "f0",
+            |    "type": "int"
+            |  }, {
+            |    "name": "f1",
+            |    "type": {
+            |      "type": "record",
+            |      "name": "inner",
+            |      "fields": [ {
+            |        "name": "f10",
+            |        "type": "int"
+            |      }, {
+            |        "name": "f11",
+            |        "type": "double"
+            |      } ]
+            |    }
+            |  } ]
+            |}
+          """.stripMargin
+
+        withTable(tableName, tempTableName) {
+          // Creates the external partitioned Avro table to be tested.
+          sql(
+            s"""CREATE EXTERNAL TABLE $tableName
+               |PARTITIONED BY (ds STRING)
+               |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
+               |STORED AS
+               |  INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
+               |  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
+               |LOCATION '$path/$tableName'
+               |TBLPROPERTIES ('avro.schema.literal' = '$avroSchema')
+           """.stripMargin
+          )
+
+          // Creates an temporary Avro table used to prepare testing Avro file.
+          sql(
+            s"""CREATE EXTERNAL TABLE $tempTableName
+               |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
+               |STORED AS
+               |  INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
+               |  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
+               |LOCATION '$path/$tempTableName'
+               |TBLPROPERTIES ('avro.schema.literal' = '$avroSchema')
+           """.stripMargin
+          )
+
+          // Generates Avro data.
+          sql(s"INSERT OVERWRITE TABLE $tempTableName SELECT 1, STRUCT(2, 2.5)")
+
+          // Adds generated Avro data as a new partition to the testing table.
+          sql(s"ALTER TABLE $tableName ADD PARTITION (ds = 'foo') LOCATION '$path/$tempTableName'")
+
+          // The following query fails before SPARK-13709 is fixed. This is because when reading
+          // data from table partitions, Avro deserializer needs the Avro schema, which is defined
+          // in table property "avro.schema.literal". However, we only initializes the deserializer
+          // using partition properties, which doesn't include the wanted property entry. Merging
+          // two sets of properties solves the problem.
+          checkAnswer(
+            sql(s"SELECT * FROM $tableName"),
+            Row(1, Row(2, 2.5D), "foo")
+          )
+        }
+      }
+    }
+
     // TODO: add more tests.
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/65b10ffb/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 58be079..9d9f3a6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -79,6 +79,25 @@ class HiveDDLSuite
     }
   }
 
+  test("create a hive table without schema") {
+    import testImplicits._
+    withTempPath { tempDir =>
+      withTable("tab1", "tab2") {
+        (("a", "b") :: Nil).toDF().write.json(tempDir.getCanonicalPath)
+
+        var e = intercept[AnalysisException] { sql("CREATE TABLE tab1 USING hive") }.getMessage
+        assert(e.contains("Unable to infer the schema. The schema specification is required to " +
+          "create the table `default`.`tab1`"))
+
+        e = intercept[AnalysisException] {
+          sql(s"CREATE TABLE tab2 location '${tempDir.getCanonicalPath}'")
+        }.getMessage
+        assert(e.contains("Unable to infer the schema. The schema specification is required to " +
+          "create the table `default`.`tab2`"))
+      }
+    }
+  }
+
   test("drop external tables in default database") {
     withTempDir { tmpDir =>
       val tabName = "tab1"
@@ -199,7 +218,7 @@ class HiveDDLSuite
     val e = intercept[AnalysisException] {
       sql("CREATE TABLE tbl(a int) PARTITIONED BY (a string)")
     }
-    assert(e.message == "Found duplicate column(s) in table definition of `tbl`: a")
+    assert(e.message == "Found duplicate column(s) in table definition of `default`.`tbl`: a")
   }
 
   test("add/drop partition with location - managed table") {
@@ -1192,7 +1211,7 @@ class HiveDDLSuite
         assert(e2.getMessage.contains(forbiddenPrefix + "foo"))
 
         val e3 = intercept[AnalysisException] {
-          sql(s"CREATE TABLE tbl TBLPROPERTIES ('${forbiddenPrefix}foo'='anything')")
+          sql(s"CREATE TABLE tbl (a INT) TBLPROPERTIES ('${forbiddenPrefix}foo'='anything')")
         }
         assert(e3.getMessage.contains(forbiddenPrefix + "foo"))
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org