You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/02/12 06:21:35 UTC

spark git commit: [SPARK-19448][SQL] optimize some duplication functions between HiveClientImpl and HiveUtils

Repository: spark
Updated Branches:
  refs/heads/master 0fbecc736 -> 3881f342b


[SPARK-19448][SQL] optimize some duplication functions between HiveClientImpl and HiveUtils

## What changes were proposed in this pull request?

There are some duplicate functions between `HiveClientImpl` and `HiveUtils`, we can merge them to one place. such as: `toHiveTable` \u3001`toHivePartition`\u3001`fromHivePartition`.

And additional modify is change `MetastoreRelation.attributes` to `MetastoreRelation.dataColKeys`
https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala#L234

## How was this patch tested?
N/A

Author: windpiger <so...@outlook.com>

Closes #16787 from windpiger/todoInMetaStoreRelation.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3881f342
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3881f342
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3881f342

Branch: refs/heads/master
Commit: 3881f342b49efdb1e0d5ee27f616451ea1928c5d
Parents: 0fbecc7
Author: windpiger <so...@outlook.com>
Authored: Sat Feb 11 22:21:14 2017 -0800
Committer: Xiao Li <ga...@gmail.com>
Committed: Sat Feb 11 22:21:14 2017 -0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/hive/HiveUtils.scala   | 127 +------------------
 .../spark/sql/hive/MetastoreRelation.scala      |  13 +-
 .../spark/sql/hive/client/HiveClientImpl.scala  |  70 ++++++----
 .../sql/hive/execution/HiveTableScanExec.scala  |   2 +-
 ...ernalCatalogBackwardCompatibilitySuite.scala |   5 +
 .../sql/hive/MetastoreDataSourcesSuite.scala    |   3 +
 .../spark/sql/hive/client/VersionsSuite.scala   |  25 +++-
 7 files changed, 88 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3881f342/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index 13ab4e8..afc2bf8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -31,17 +31,13 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hive.common.`type`.HiveDecimal
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
-import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
-import org.apache.hadoop.hive.metastore.api.FieldSchema
-import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
 import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
 import org.apache.hadoop.util.VersionInfo
 
-import org.apache.spark.{SparkConf, SparkContext, SparkException}
+import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition, CatalogTableType}
-import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.hive.client._
 import org.apache.spark.sql.internal.SQLConf
@@ -455,117 +451,6 @@ private[spark] object HiveUtils extends Logging {
     case (other, tpe) if primitiveTypes contains tpe => other.toString
   }
 
-  /** Converts the native StructField to Hive's FieldSchema. */
-  private def toHiveColumn(c: StructField): FieldSchema = {
-    val typeString = if (c.metadata.contains(HIVE_TYPE_STRING)) {
-      c.metadata.getString(HIVE_TYPE_STRING)
-    } else {
-      c.dataType.catalogString
-    }
-    new FieldSchema(c.name, typeString, c.getComment.orNull)
-  }
-
-  /** Builds the native StructField from Hive's FieldSchema. */
-  private def fromHiveColumn(hc: FieldSchema): StructField = {
-    val columnType = try {
-      CatalystSqlParser.parseDataType(hc.getType)
-    } catch {
-      case e: ParseException =>
-        throw new SparkException("Cannot recognize hive type string: " + hc.getType, e)
-    }
-
-    val metadata = new MetadataBuilder().putString(HIVE_TYPE_STRING, hc.getType).build()
-    val field = StructField(
-      name = hc.getName,
-      dataType = columnType,
-      nullable = true,
-      metadata = metadata)
-    Option(hc.getComment).map(field.withComment).getOrElse(field)
-  }
-
-  // TODO: merge this with HiveClientImpl#toHiveTable
-  /** Converts the native table metadata representation format CatalogTable to Hive's Table. */
-  def toHiveTable(catalogTable: CatalogTable): HiveTable = {
-    // We start by constructing an API table as Hive performs several important transformations
-    // internally when converting an API table to a QL table.
-    val tTable = new org.apache.hadoop.hive.metastore.api.Table()
-    tTable.setTableName(catalogTable.identifier.table)
-    tTable.setDbName(catalogTable.database)
-
-    val tableParameters = new java.util.HashMap[String, String]()
-    tTable.setParameters(tableParameters)
-    catalogTable.properties.foreach { case (k, v) => tableParameters.put(k, v) }
-
-    tTable.setTableType(catalogTable.tableType match {
-      case CatalogTableType.EXTERNAL => HiveTableType.EXTERNAL_TABLE.toString
-      case CatalogTableType.MANAGED => HiveTableType.MANAGED_TABLE.toString
-      case CatalogTableType.VIEW => HiveTableType.VIRTUAL_VIEW.toString
-    })
-
-    val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
-    tTable.setSd(sd)
-
-    // Note: In Hive the schema and partition columns must be disjoint sets
-    val (partCols, schema) = catalogTable.schema.map(toHiveColumn).partition { c =>
-      catalogTable.partitionColumnNames.contains(c.getName)
-    }
-    sd.setCols(schema.asJava)
-    tTable.setPartitionKeys(partCols.asJava)
-
-    catalogTable.storage.locationUri.foreach(sd.setLocation)
-    catalogTable.storage.inputFormat.foreach(sd.setInputFormat)
-    catalogTable.storage.outputFormat.foreach(sd.setOutputFormat)
-
-    val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
-    catalogTable.storage.serde.foreach(serdeInfo.setSerializationLib)
-    sd.setSerdeInfo(serdeInfo)
-
-    val serdeParameters = new java.util.HashMap[String, String]()
-    catalogTable.storage.properties.foreach { case (k, v) => serdeParameters.put(k, v) }
-    serdeInfo.setParameters(serdeParameters)
-
-    new HiveTable(tTable)
-  }
-
-  /**
-   * Converts the native partition metadata representation format CatalogTablePartition to
-   * Hive's Partition.
-   */
-  def toHivePartition(
-      catalogTable: CatalogTable,
-      hiveTable: HiveTable,
-      partition: CatalogTablePartition): HivePartition = {
-    val tPartition = new org.apache.hadoop.hive.metastore.api.Partition
-    tPartition.setDbName(catalogTable.database)
-    tPartition.setTableName(catalogTable.identifier.table)
-    tPartition.setValues(catalogTable.partitionColumnNames.map(partition.spec(_)).asJava)
-
-    val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
-    tPartition.setSd(sd)
-
-    // Note: In Hive the schema and partition columns must be disjoint sets
-    val schema = catalogTable.schema.map(toHiveColumn).filter { c =>
-      !catalogTable.partitionColumnNames.contains(c.getName)
-    }
-    sd.setCols(schema.asJava)
-
-    partition.storage.locationUri.foreach(sd.setLocation)
-    partition.storage.inputFormat.foreach(sd.setInputFormat)
-    partition.storage.outputFormat.foreach(sd.setOutputFormat)
-
-    val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
-    sd.setSerdeInfo(serdeInfo)
-    // maps and lists should be set only after all elements are ready (see HIVE-7975)
-    partition.storage.serde.foreach(serdeInfo.setSerializationLib)
-
-    val serdeParameters = new java.util.HashMap[String, String]()
-    catalogTable.storage.properties.foreach { case (k, v) => serdeParameters.put(k, v) }
-    partition.storage.properties.foreach { case (k, v) => serdeParameters.put(k, v) }
-    serdeInfo.setParameters(serdeParameters)
-
-    new HivePartition(hiveTable, tPartition)
-  }
-
   /**
    * Infers the schema for Hive serde tables and returns the CatalogTable with the inferred schema.
    * When the tables are data source tables or the schema already exists, returns the original
@@ -575,12 +460,12 @@ private[spark] object HiveUtils extends Logging {
     if (DDLUtils.isDatasourceTable(table) || table.dataSchema.nonEmpty) {
       table
     } else {
-      val hiveTable = toHiveTable(table)
+      val hiveTable = HiveClientImpl.toHiveTable(table)
       // Note: Hive separates partition columns and the schema, but for us the
       // partition columns are part of the schema
-      val partCols = hiveTable.getPartCols.asScala.map(fromHiveColumn)
-      val schema = StructType(hiveTable.getCols.asScala.map(fromHiveColumn) ++ partCols)
-      table.copy(schema = schema)
+      val partCols = hiveTable.getPartCols.asScala.map(HiveClientImpl.fromHiveColumn)
+      val dataCols = hiveTable.getCols.asScala.map(HiveClientImpl.fromHiveColumn)
+      table.copy(schema = StructType(dataCols ++ partCols))
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3881f342/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
index 6394eb6..97b1207 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference, Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
 import org.apache.spark.sql.execution.FileRelation
+import org.apache.spark.sql.hive.client.HiveClientImpl
 import org.apache.spark.sql.types.StructField
 
 
@@ -56,7 +57,7 @@ private[hive] case class MetastoreRelation(
 
   override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sparkSession :: Nil
 
-  @transient val hiveQlTable: HiveTable = HiveUtils.toHiveTable(catalogTable)
+  @transient val hiveQlTable: HiveTable = HiveClientImpl.toHiveTable(catalogTable)
 
   @transient override def computeStats(conf: CatalystConf): Statistics = {
     catalogTable.stats.map(_.toPlanStats(output)).getOrElse(Statistics(
@@ -111,7 +112,8 @@ private[hive] case class MetastoreRelation(
     } else {
       allPartitions
     }
-    rawPartitions.map(HiveUtils.toHivePartition(catalogTable, hiveQlTable, _))
+
+    rawPartitions.map(HiveClientImpl.toHivePartition(_, hiveQlTable))
   }
 
   /** Only compare database and tablename, not alias. */
@@ -146,18 +148,17 @@ private[hive] case class MetastoreRelation(
   val partitionKeys = catalogTable.partitionSchema.map(_.toAttribute)
 
   /** Non-partitionKey attributes */
-  // TODO: just make this hold the schema itself, not just non-partition columns
-  val attributes = catalogTable.schema
+  val dataColKeys = catalogTable.schema
     .filter { c => !catalogTable.partitionColumnNames.contains(c.name) }
     .map(_.toAttribute)
 
-  val output = attributes ++ partitionKeys
+  val output = dataColKeys ++ partitionKeys
 
   /** An attribute map that can be used to lookup original attributes based on expression id. */
   val attributeMap = AttributeMap(output.map(o => (o, o)))
 
   /** An attribute map for determining the ordinal for non-partition columns. */
-  val columnOrdinals = AttributeMap(attributes.zipWithIndex)
+  val columnOrdinals = AttributeMap(dataColKeys.zipWithIndex)
 
   override def inputFiles: Array[String] = {
     val partLocations = allPartitions

http://git-wip-us.apache.org/repos/asf/spark/blob/3881f342/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index f0d01eb..dc9c3ff 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -46,7 +46,8 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
 import org.apache.spark.sql.execution.QueryExecutionException
-import org.apache.spark.sql.hive.HiveUtils
+import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.hive.client.HiveClientImpl._
 import org.apache.spark.sql.types._
 import org.apache.spark.util.{CircularBuffer, Utils}
 
@@ -435,7 +436,7 @@ private[hive] class HiveClientImpl(
   }
 
   override def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = withHiveState {
-    client.createTable(toHiveTable(table), ignoreIfExists)
+    client.createTable(toHiveTable(table, Some(conf)), ignoreIfExists)
   }
 
   override def dropTable(
@@ -447,7 +448,7 @@ private[hive] class HiveClientImpl(
   }
 
   override def alterTable(tableName: String, table: CatalogTable): Unit = withHiveState {
-    val hiveTable = toHiveTable(table)
+    val hiveTable = toHiveTable(table, Some(conf))
     // Do not use `table.qualifiedName` here because this may be a rename
     val qualifiedTableName = s"${table.database}.$tableName"
     client.alterTable(qualifiedTableName, hiveTable)
@@ -516,7 +517,7 @@ private[hive] class HiveClientImpl(
       newSpecs: Seq[TablePartitionSpec]): Unit = withHiveState {
     require(specs.size == newSpecs.size, "number of old and new partition specs differ")
     val catalogTable = getTable(db, table)
-    val hiveTable = toHiveTable(catalogTable)
+    val hiveTable = toHiveTable(catalogTable, Some(conf))
     specs.zip(newSpecs).foreach { case (oldSpec, newSpec) =>
       val hivePart = getPartitionOption(catalogTable, oldSpec)
         .map { p => toHivePartition(p.copy(spec = newSpec), hiveTable) }
@@ -529,7 +530,7 @@ private[hive] class HiveClientImpl(
       db: String,
       table: String,
       newParts: Seq[CatalogTablePartition]): Unit = withHiveState {
-    val hiveTable = toHiveTable(getTable(db, table))
+    val hiveTable = toHiveTable(getTable(db, table), Some(conf))
     client.alterPartitions(table, newParts.map { p => toHivePartition(p, hiveTable) }.asJava)
   }
 
@@ -557,7 +558,7 @@ private[hive] class HiveClientImpl(
   override def getPartitionOption(
       table: CatalogTable,
       spec: TablePartitionSpec): Option[CatalogTablePartition] = withHiveState {
-    val hiveTable = toHiveTable(table)
+    val hiveTable = toHiveTable(table, Some(conf))
     val hivePartition = client.getPartition(hiveTable, spec.asJava, false)
     Option(hivePartition).map(fromHivePartition)
   }
@@ -569,7 +570,7 @@ private[hive] class HiveClientImpl(
   override def getPartitions(
       table: CatalogTable,
       spec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = withHiveState {
-    val hiveTable = toHiveTable(table)
+    val hiveTable = toHiveTable(table, Some(conf))
     val parts = spec match {
       case None => shim.getAllPartitions(client, hiveTable).map(fromHivePartition)
       case Some(s) =>
@@ -583,7 +584,7 @@ private[hive] class HiveClientImpl(
   override def getPartitionsByFilter(
       table: CatalogTable,
       predicates: Seq[Expression]): Seq[CatalogTablePartition] = withHiveState {
-    val hiveTable = toHiveTable(table)
+    val hiveTable = toHiveTable(table, Some(conf))
     val parts = shim.getPartitionsByFilter(client, hiveTable, predicates).map(fromHivePartition)
     HiveCatalogMetrics.incrementFetchedPartitions(parts.length)
     parts
@@ -776,20 +777,11 @@ private[hive] class HiveClientImpl(
         client.dropDatabase(db, true, false, true)
       }
   }
+}
 
-
-  /* -------------------------------------------------------- *
-   |  Helper methods for converting to and from Hive classes  |
-   * -------------------------------------------------------- */
-
-  private def toInputFormat(name: String) =
-    Utils.classForName(name).asInstanceOf[Class[_ <: org.apache.hadoop.mapred.InputFormat[_, _]]]
-
-  private def toOutputFormat(name: String) =
-    Utils.classForName(name)
-      .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
-
-  private def toHiveColumn(c: StructField): FieldSchema = {
+private[hive] object HiveClientImpl {
+  /** Converts the native StructField to Hive's FieldSchema. */
+  def toHiveColumn(c: StructField): FieldSchema = {
     val typeString = if (c.metadata.contains(HIVE_TYPE_STRING)) {
       c.metadata.getString(HIVE_TYPE_STRING)
     } else {
@@ -798,7 +790,8 @@ private[hive] class HiveClientImpl(
     new FieldSchema(c.name, typeString, c.getComment().orNull)
   }
 
-  private def fromHiveColumn(hc: FieldSchema): StructField = {
+  /** Builds the native StructField from Hive's FieldSchema. */
+  def fromHiveColumn(hc: FieldSchema): StructField = {
     val columnType = try {
       CatalystSqlParser.parseDataType(hc.getType)
     } catch {
@@ -815,7 +808,19 @@ private[hive] class HiveClientImpl(
     Option(hc.getComment).map(field.withComment).getOrElse(field)
   }
 
-  private def toHiveTable(table: CatalogTable): HiveTable = {
+  private def toInputFormat(name: String) =
+    Utils.classForName(name).asInstanceOf[Class[_ <: org.apache.hadoop.mapred.InputFormat[_, _]]]
+
+  private def toOutputFormat(name: String) =
+    Utils.classForName(name)
+      .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
+
+  /**
+   * Converts the native table metadata representation format CatalogTable to Hive's Table.
+   */
+  def toHiveTable(
+      table: CatalogTable,
+      conf: Option[HiveConf] = None): HiveTable = {
     val hiveTable = new HiveTable(table.database, table.identifier.table)
     // For EXTERNAL_TABLE, we also need to set EXTERNAL field in the table properties.
     // Otherwise, Hive metastore will change the table to a MANAGED_TABLE.
@@ -832,7 +837,9 @@ private[hive] class HiveClientImpl(
     val (partCols, schema) = table.schema.map(toHiveColumn).partition { c =>
       table.partitionColumnNames.contains(c.getName)
     }
-    if (schema.isEmpty) {
+    // after SPARK-19279, it is not allowed to create a hive table with an empty schema,
+    // so here we should not add a default col schema
+    if (schema.isEmpty && DDLUtils.isDatasourceTable(table)) {
       // This is a hack to preserve existing behavior. Before Spark 2.0, we do not
       // set a default serde here (this was done in Hive), and so if the user provides
       // an empty schema Hive would automatically populate the schema with a single
@@ -845,10 +852,10 @@ private[hive] class HiveClientImpl(
       hiveTable.setFields(schema.asJava)
     }
     hiveTable.setPartCols(partCols.asJava)
-    hiveTable.setOwner(conf.getUser)
+    conf.foreach(c => hiveTable.setOwner(c.getUser))
     hiveTable.setCreateTime((table.createTime / 1000).toInt)
     hiveTable.setLastAccessTime((table.lastAccessTime / 1000).toInt)
-    table.storage.locationUri.foreach { loc => shim.setDataLocation(hiveTable, loc) }
+    table.storage.locationUri.foreach { loc => hiveTable.getTTable.getSd.setLocation(loc)}
     table.storage.inputFormat.map(toInputFormat).foreach(hiveTable.setInputFormatClass)
     table.storage.outputFormat.map(toOutputFormat).foreach(hiveTable.setOutputFormatClass)
     hiveTable.setSerializationLib(
@@ -866,7 +873,11 @@ private[hive] class HiveClientImpl(
     hiveTable
   }
 
-  private def toHivePartition(
+  /**
+   * Converts the native partition metadata representation format CatalogTablePartition to
+   * Hive's Partition.
+   */
+  def toHivePartition(
       p: CatalogTablePartition,
       ht: HiveTable): HivePartition = {
     val tpart = new org.apache.hadoop.hive.metastore.api.Partition
@@ -891,7 +902,10 @@ private[hive] class HiveClientImpl(
     new HivePartition(ht, tpart)
   }
 
-  private def fromHivePartition(hp: HivePartition): CatalogTablePartition = {
+  /**
+   * Build the native partition metadata from Hive's Partition.
+   */
+  def fromHivePartition(hp: HivePartition): CatalogTablePartition = {
     val apiPartition = hp.getTPartition
     CatalogTablePartition(
       spec = Option(hp.getSpec).map(_.asScala.toMap).getOrElse(Map.empty),

http://git-wip-us.apache.org/repos/asf/spark/blob/3881f342/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
index def6ef3..140c352 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
@@ -113,7 +113,7 @@ case class HiveTableScanExec(
       .mkString(",")
 
     hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames)
-    hiveConf.set(serdeConstants.LIST_COLUMNS, relation.attributes.map(_.name).mkString(","))
+    hiveConf.set(serdeConstants.LIST_COLUMNS, relation.dataColKeys.map(_.name).mkString(","))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/3881f342/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala
index 00fdfbc..ee632d2 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala
@@ -134,6 +134,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
     storage = CatalogStorageFormat.empty.copy(
       properties = Map("path" -> defaultTableURI("tbl4").toString)),
     schema = new StructType(),
+    provider = Some("json"),
     properties = Map(
       "spark.sql.sources.provider" -> "json",
       "spark.sql.sources.schema.numParts" -> "1",
@@ -145,6 +146,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
     storage = CatalogStorageFormat.empty.copy(
       properties = Map("path" -> defaultTableURI("tbl5").toString)),
     schema = simpleSchema,
+    provider = Some("parquet"),
     properties = Map(
       "spark.sql.sources.provider" -> "parquet",
       "spark.sql.sources.schema.numParts" -> "1",
@@ -156,6 +158,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
     storage = CatalogStorageFormat.empty.copy(
       properties = Map("path" -> defaultTableURI("tbl6").toString)),
     schema = new StructType(),
+    provider = Some("json"),
     properties = Map(
       "spark.sql.sources.provider" -> "json",
       "spark.sql.sources.schema.numParts" -> "1",
@@ -170,6 +173,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
       locationUri = Some(defaultTableURI("tbl7").toString + "-__PLACEHOLDER__"),
       properties = Map("path" -> tempDirUri)),
     schema = new StructType(),
+    provider = Some("json"),
     properties = Map(
       "spark.sql.sources.provider" -> "json",
       "spark.sql.sources.schema.numParts" -> "1",
@@ -194,6 +198,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
       locationUri = Some(defaultTableURI("tbl9").toString + "-__PLACEHOLDER__"),
       properties = Map("path" -> tempDirUri)),
     schema = new StructType(),
+    provider = Some("json"),
     properties = Map("spark.sql.sources.provider" -> "json"))
 
   // A list of all raw tables we want to test, with their expected schema.

http://git-wip-us.apache.org/repos/asf/spark/blob/3881f342/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index cf1fe2b..e951bbe 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -748,6 +748,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
         identifier = TableIdentifier(tableName, Some("default")),
         tableType = CatalogTableType.MANAGED,
         schema = new StructType,
+        provider = Some("json"),
         storage = CatalogStorageFormat(
           locationUri = None,
           inputFormat = None,
@@ -1276,6 +1277,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
         identifier = TableIdentifier("t", Some("default")),
         tableType = CatalogTableType.MANAGED,
         schema = new StructType,
+        provider = Some("json"),
         storage = CatalogStorageFormat.empty,
         properties = Map(
           DATASOURCE_PROVIDER -> "json",
@@ -1373,6 +1375,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
             properties = Map("path" -> path.getAbsolutePath)
           ),
           schema = new StructType(),
+          provider = Some("parquet"),
           properties = Map(
             HiveExternalCatalog.DATASOURCE_PROVIDER -> "parquet"))
         hiveClient.createTable(tableDesc, ignoreIfExists = false)

http://git-wip-us.apache.org/repos/asf/spark/blob/3881f342/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index ca39c7e..fe14824 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.client
 import java.io.{ByteArrayOutputStream, File, PrintStream}
 
 import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
 import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
 import org.apache.hadoop.mapred.TextInputFormat
@@ -570,7 +571,6 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w
       }
     }
 
-
     test(s"$version: SPARK-13709: reading partitioned Avro table with nested schema") {
       withTempDir { dir =>
         val path = dir.toURI.toString
@@ -649,6 +649,29 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w
       }
     }
 
+    test(s"$version: CTAS for managed data source tables") {
+      withTable("t", "t1") {
+        import spark.implicits._
+
+        val tPath = new Path(spark.sessionState.conf.warehousePath, "t")
+        Seq("1").toDF("a").write.saveAsTable("t")
+        val expectedPath = s"file:${tPath.toUri.getPath.stripSuffix("/")}"
+        val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+
+        assert(table.location.stripSuffix("/") == expectedPath)
+        assert(tPath.getFileSystem(spark.sessionState.newHadoopConf()).exists(tPath))
+        checkAnswer(spark.table("t"), Row("1") :: Nil)
+
+        val t1Path = new Path(spark.sessionState.conf.warehousePath, "t1")
+        spark.sql("create table t1 using parquet as select 2 as a")
+        val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
+        val expectedPath1 = s"file:${t1Path.toUri.getPath.stripSuffix("/")}"
+
+        assert(table1.location.stripSuffix("/") == expectedPath1)
+        assert(t1Path.getFileSystem(spark.sessionState.newHadoopConf()).exists(t1Path))
+        checkAnswer(spark.table("t1"), Row(2) :: Nil)
+      }
+    }
     // TODO: add more tests.
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org