You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/02/12 06:21:35 UTC
spark git commit: [SPARK-19448][SQL] optimize some duplication
functions between HiveClientImpl and HiveUtils
Repository: spark
Updated Branches:
refs/heads/master 0fbecc736 -> 3881f342b
[SPARK-19448][SQL] optimize some duplication functions between HiveClientImpl and HiveUtils
## What changes were proposed in this pull request?
There are some duplicate functions between `HiveClientImpl` and `HiveUtils`, we can merge them to one place. such as: `toHiveTable` \u3001`toHivePartition`\u3001`fromHivePartition`.
And additional modify is change `MetastoreRelation.attributes` to `MetastoreRelation.dataColKeys`
https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala#L234
## How was this patch tested?
N/A
Author: windpiger <so...@outlook.com>
Closes #16787 from windpiger/todoInMetaStoreRelation.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3881f342
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3881f342
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3881f342
Branch: refs/heads/master
Commit: 3881f342b49efdb1e0d5ee27f616451ea1928c5d
Parents: 0fbecc7
Author: windpiger <so...@outlook.com>
Authored: Sat Feb 11 22:21:14 2017 -0800
Committer: Xiao Li <ga...@gmail.com>
Committed: Sat Feb 11 22:21:14 2017 -0800
----------------------------------------------------------------------
.../org/apache/spark/sql/hive/HiveUtils.scala | 127 +------------------
.../spark/sql/hive/MetastoreRelation.scala | 13 +-
.../spark/sql/hive/client/HiveClientImpl.scala | 70 ++++++----
.../sql/hive/execution/HiveTableScanExec.scala | 2 +-
...ernalCatalogBackwardCompatibilitySuite.scala | 5 +
.../sql/hive/MetastoreDataSourcesSuite.scala | 3 +
.../spark/sql/hive/client/VersionsSuite.scala | 25 +++-
7 files changed, 88 insertions(+), 157 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/3881f342/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index 13ab4e8..afc2bf8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -31,17 +31,13 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.common.`type`.HiveDecimal
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
-import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
-import org.apache.hadoop.hive.metastore.api.FieldSchema
-import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
import org.apache.hadoop.util.VersionInfo
-import org.apache.spark.{SparkConf, SparkContext, SparkException}
+import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition, CatalogTableType}
-import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.internal.SQLConf
@@ -455,117 +451,6 @@ private[spark] object HiveUtils extends Logging {
case (other, tpe) if primitiveTypes contains tpe => other.toString
}
- /** Converts the native StructField to Hive's FieldSchema. */
- private def toHiveColumn(c: StructField): FieldSchema = {
- val typeString = if (c.metadata.contains(HIVE_TYPE_STRING)) {
- c.metadata.getString(HIVE_TYPE_STRING)
- } else {
- c.dataType.catalogString
- }
- new FieldSchema(c.name, typeString, c.getComment.orNull)
- }
-
- /** Builds the native StructField from Hive's FieldSchema. */
- private def fromHiveColumn(hc: FieldSchema): StructField = {
- val columnType = try {
- CatalystSqlParser.parseDataType(hc.getType)
- } catch {
- case e: ParseException =>
- throw new SparkException("Cannot recognize hive type string: " + hc.getType, e)
- }
-
- val metadata = new MetadataBuilder().putString(HIVE_TYPE_STRING, hc.getType).build()
- val field = StructField(
- name = hc.getName,
- dataType = columnType,
- nullable = true,
- metadata = metadata)
- Option(hc.getComment).map(field.withComment).getOrElse(field)
- }
-
- // TODO: merge this with HiveClientImpl#toHiveTable
- /** Converts the native table metadata representation format CatalogTable to Hive's Table. */
- def toHiveTable(catalogTable: CatalogTable): HiveTable = {
- // We start by constructing an API table as Hive performs several important transformations
- // internally when converting an API table to a QL table.
- val tTable = new org.apache.hadoop.hive.metastore.api.Table()
- tTable.setTableName(catalogTable.identifier.table)
- tTable.setDbName(catalogTable.database)
-
- val tableParameters = new java.util.HashMap[String, String]()
- tTable.setParameters(tableParameters)
- catalogTable.properties.foreach { case (k, v) => tableParameters.put(k, v) }
-
- tTable.setTableType(catalogTable.tableType match {
- case CatalogTableType.EXTERNAL => HiveTableType.EXTERNAL_TABLE.toString
- case CatalogTableType.MANAGED => HiveTableType.MANAGED_TABLE.toString
- case CatalogTableType.VIEW => HiveTableType.VIRTUAL_VIEW.toString
- })
-
- val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
- tTable.setSd(sd)
-
- // Note: In Hive the schema and partition columns must be disjoint sets
- val (partCols, schema) = catalogTable.schema.map(toHiveColumn).partition { c =>
- catalogTable.partitionColumnNames.contains(c.getName)
- }
- sd.setCols(schema.asJava)
- tTable.setPartitionKeys(partCols.asJava)
-
- catalogTable.storage.locationUri.foreach(sd.setLocation)
- catalogTable.storage.inputFormat.foreach(sd.setInputFormat)
- catalogTable.storage.outputFormat.foreach(sd.setOutputFormat)
-
- val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
- catalogTable.storage.serde.foreach(serdeInfo.setSerializationLib)
- sd.setSerdeInfo(serdeInfo)
-
- val serdeParameters = new java.util.HashMap[String, String]()
- catalogTable.storage.properties.foreach { case (k, v) => serdeParameters.put(k, v) }
- serdeInfo.setParameters(serdeParameters)
-
- new HiveTable(tTable)
- }
-
- /**
- * Converts the native partition metadata representation format CatalogTablePartition to
- * Hive's Partition.
- */
- def toHivePartition(
- catalogTable: CatalogTable,
- hiveTable: HiveTable,
- partition: CatalogTablePartition): HivePartition = {
- val tPartition = new org.apache.hadoop.hive.metastore.api.Partition
- tPartition.setDbName(catalogTable.database)
- tPartition.setTableName(catalogTable.identifier.table)
- tPartition.setValues(catalogTable.partitionColumnNames.map(partition.spec(_)).asJava)
-
- val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
- tPartition.setSd(sd)
-
- // Note: In Hive the schema and partition columns must be disjoint sets
- val schema = catalogTable.schema.map(toHiveColumn).filter { c =>
- !catalogTable.partitionColumnNames.contains(c.getName)
- }
- sd.setCols(schema.asJava)
-
- partition.storage.locationUri.foreach(sd.setLocation)
- partition.storage.inputFormat.foreach(sd.setInputFormat)
- partition.storage.outputFormat.foreach(sd.setOutputFormat)
-
- val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
- sd.setSerdeInfo(serdeInfo)
- // maps and lists should be set only after all elements are ready (see HIVE-7975)
- partition.storage.serde.foreach(serdeInfo.setSerializationLib)
-
- val serdeParameters = new java.util.HashMap[String, String]()
- catalogTable.storage.properties.foreach { case (k, v) => serdeParameters.put(k, v) }
- partition.storage.properties.foreach { case (k, v) => serdeParameters.put(k, v) }
- serdeInfo.setParameters(serdeParameters)
-
- new HivePartition(hiveTable, tPartition)
- }
-
/**
* Infers the schema for Hive serde tables and returns the CatalogTable with the inferred schema.
* When the tables are data source tables or the schema already exists, returns the original
@@ -575,12 +460,12 @@ private[spark] object HiveUtils extends Logging {
if (DDLUtils.isDatasourceTable(table) || table.dataSchema.nonEmpty) {
table
} else {
- val hiveTable = toHiveTable(table)
+ val hiveTable = HiveClientImpl.toHiveTable(table)
// Note: Hive separates partition columns and the schema, but for us the
// partition columns are part of the schema
- val partCols = hiveTable.getPartCols.asScala.map(fromHiveColumn)
- val schema = StructType(hiveTable.getCols.asScala.map(fromHiveColumn) ++ partCols)
- table.copy(schema = schema)
+ val partCols = hiveTable.getPartCols.asScala.map(HiveClientImpl.fromHiveColumn)
+ val dataCols = hiveTable.getCols.asScala.map(HiveClientImpl.fromHiveColumn)
+ table.copy(schema = StructType(dataCols ++ partCols))
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/3881f342/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
index 6394eb6..97b1207 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.execution.FileRelation
+import org.apache.spark.sql.hive.client.HiveClientImpl
import org.apache.spark.sql.types.StructField
@@ -56,7 +57,7 @@ private[hive] case class MetastoreRelation(
override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sparkSession :: Nil
- @transient val hiveQlTable: HiveTable = HiveUtils.toHiveTable(catalogTable)
+ @transient val hiveQlTable: HiveTable = HiveClientImpl.toHiveTable(catalogTable)
@transient override def computeStats(conf: CatalystConf): Statistics = {
catalogTable.stats.map(_.toPlanStats(output)).getOrElse(Statistics(
@@ -111,7 +112,8 @@ private[hive] case class MetastoreRelation(
} else {
allPartitions
}
- rawPartitions.map(HiveUtils.toHivePartition(catalogTable, hiveQlTable, _))
+
+ rawPartitions.map(HiveClientImpl.toHivePartition(_, hiveQlTable))
}
/** Only compare database and tablename, not alias. */
@@ -146,18 +148,17 @@ private[hive] case class MetastoreRelation(
val partitionKeys = catalogTable.partitionSchema.map(_.toAttribute)
/** Non-partitionKey attributes */
- // TODO: just make this hold the schema itself, not just non-partition columns
- val attributes = catalogTable.schema
+ val dataColKeys = catalogTable.schema
.filter { c => !catalogTable.partitionColumnNames.contains(c.name) }
.map(_.toAttribute)
- val output = attributes ++ partitionKeys
+ val output = dataColKeys ++ partitionKeys
/** An attribute map that can be used to lookup original attributes based on expression id. */
val attributeMap = AttributeMap(output.map(o => (o, o)))
/** An attribute map for determining the ordinal for non-partition columns. */
- val columnOrdinals = AttributeMap(attributes.zipWithIndex)
+ val columnOrdinals = AttributeMap(dataColKeys.zipWithIndex)
override def inputFiles: Array[String] = {
val partLocations = allPartitions
http://git-wip-us.apache.org/repos/asf/spark/blob/3881f342/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index f0d01eb..dc9c3ff 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -46,7 +46,8 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.execution.QueryExecutionException
-import org.apache.spark.sql.hive.HiveUtils
+import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.hive.client.HiveClientImpl._
import org.apache.spark.sql.types._
import org.apache.spark.util.{CircularBuffer, Utils}
@@ -435,7 +436,7 @@ private[hive] class HiveClientImpl(
}
override def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = withHiveState {
- client.createTable(toHiveTable(table), ignoreIfExists)
+ client.createTable(toHiveTable(table, Some(conf)), ignoreIfExists)
}
override def dropTable(
@@ -447,7 +448,7 @@ private[hive] class HiveClientImpl(
}
override def alterTable(tableName: String, table: CatalogTable): Unit = withHiveState {
- val hiveTable = toHiveTable(table)
+ val hiveTable = toHiveTable(table, Some(conf))
// Do not use `table.qualifiedName` here because this may be a rename
val qualifiedTableName = s"${table.database}.$tableName"
client.alterTable(qualifiedTableName, hiveTable)
@@ -516,7 +517,7 @@ private[hive] class HiveClientImpl(
newSpecs: Seq[TablePartitionSpec]): Unit = withHiveState {
require(specs.size == newSpecs.size, "number of old and new partition specs differ")
val catalogTable = getTable(db, table)
- val hiveTable = toHiveTable(catalogTable)
+ val hiveTable = toHiveTable(catalogTable, Some(conf))
specs.zip(newSpecs).foreach { case (oldSpec, newSpec) =>
val hivePart = getPartitionOption(catalogTable, oldSpec)
.map { p => toHivePartition(p.copy(spec = newSpec), hiveTable) }
@@ -529,7 +530,7 @@ private[hive] class HiveClientImpl(
db: String,
table: String,
newParts: Seq[CatalogTablePartition]): Unit = withHiveState {
- val hiveTable = toHiveTable(getTable(db, table))
+ val hiveTable = toHiveTable(getTable(db, table), Some(conf))
client.alterPartitions(table, newParts.map { p => toHivePartition(p, hiveTable) }.asJava)
}
@@ -557,7 +558,7 @@ private[hive] class HiveClientImpl(
override def getPartitionOption(
table: CatalogTable,
spec: TablePartitionSpec): Option[CatalogTablePartition] = withHiveState {
- val hiveTable = toHiveTable(table)
+ val hiveTable = toHiveTable(table, Some(conf))
val hivePartition = client.getPartition(hiveTable, spec.asJava, false)
Option(hivePartition).map(fromHivePartition)
}
@@ -569,7 +570,7 @@ private[hive] class HiveClientImpl(
override def getPartitions(
table: CatalogTable,
spec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = withHiveState {
- val hiveTable = toHiveTable(table)
+ val hiveTable = toHiveTable(table, Some(conf))
val parts = spec match {
case None => shim.getAllPartitions(client, hiveTable).map(fromHivePartition)
case Some(s) =>
@@ -583,7 +584,7 @@ private[hive] class HiveClientImpl(
override def getPartitionsByFilter(
table: CatalogTable,
predicates: Seq[Expression]): Seq[CatalogTablePartition] = withHiveState {
- val hiveTable = toHiveTable(table)
+ val hiveTable = toHiveTable(table, Some(conf))
val parts = shim.getPartitionsByFilter(client, hiveTable, predicates).map(fromHivePartition)
HiveCatalogMetrics.incrementFetchedPartitions(parts.length)
parts
@@ -776,20 +777,11 @@ private[hive] class HiveClientImpl(
client.dropDatabase(db, true, false, true)
}
}
+}
-
- /* -------------------------------------------------------- *
- | Helper methods for converting to and from Hive classes |
- * -------------------------------------------------------- */
-
- private def toInputFormat(name: String) =
- Utils.classForName(name).asInstanceOf[Class[_ <: org.apache.hadoop.mapred.InputFormat[_, _]]]
-
- private def toOutputFormat(name: String) =
- Utils.classForName(name)
- .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
-
- private def toHiveColumn(c: StructField): FieldSchema = {
+private[hive] object HiveClientImpl {
+ /** Converts the native StructField to Hive's FieldSchema. */
+ def toHiveColumn(c: StructField): FieldSchema = {
val typeString = if (c.metadata.contains(HIVE_TYPE_STRING)) {
c.metadata.getString(HIVE_TYPE_STRING)
} else {
@@ -798,7 +790,8 @@ private[hive] class HiveClientImpl(
new FieldSchema(c.name, typeString, c.getComment().orNull)
}
- private def fromHiveColumn(hc: FieldSchema): StructField = {
+ /** Builds the native StructField from Hive's FieldSchema. */
+ def fromHiveColumn(hc: FieldSchema): StructField = {
val columnType = try {
CatalystSqlParser.parseDataType(hc.getType)
} catch {
@@ -815,7 +808,19 @@ private[hive] class HiveClientImpl(
Option(hc.getComment).map(field.withComment).getOrElse(field)
}
- private def toHiveTable(table: CatalogTable): HiveTable = {
+ private def toInputFormat(name: String) =
+ Utils.classForName(name).asInstanceOf[Class[_ <: org.apache.hadoop.mapred.InputFormat[_, _]]]
+
+ private def toOutputFormat(name: String) =
+ Utils.classForName(name)
+ .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
+
+ /**
+ * Converts the native table metadata representation format CatalogTable to Hive's Table.
+ */
+ def toHiveTable(
+ table: CatalogTable,
+ conf: Option[HiveConf] = None): HiveTable = {
val hiveTable = new HiveTable(table.database, table.identifier.table)
// For EXTERNAL_TABLE, we also need to set EXTERNAL field in the table properties.
// Otherwise, Hive metastore will change the table to a MANAGED_TABLE.
@@ -832,7 +837,9 @@ private[hive] class HiveClientImpl(
val (partCols, schema) = table.schema.map(toHiveColumn).partition { c =>
table.partitionColumnNames.contains(c.getName)
}
- if (schema.isEmpty) {
+ // after SPARK-19279, it is not allowed to create a hive table with an empty schema,
+ // so here we should not add a default col schema
+ if (schema.isEmpty && DDLUtils.isDatasourceTable(table)) {
// This is a hack to preserve existing behavior. Before Spark 2.0, we do not
// set a default serde here (this was done in Hive), and so if the user provides
// an empty schema Hive would automatically populate the schema with a single
@@ -845,10 +852,10 @@ private[hive] class HiveClientImpl(
hiveTable.setFields(schema.asJava)
}
hiveTable.setPartCols(partCols.asJava)
- hiveTable.setOwner(conf.getUser)
+ conf.foreach(c => hiveTable.setOwner(c.getUser))
hiveTable.setCreateTime((table.createTime / 1000).toInt)
hiveTable.setLastAccessTime((table.lastAccessTime / 1000).toInt)
- table.storage.locationUri.foreach { loc => shim.setDataLocation(hiveTable, loc) }
+ table.storage.locationUri.foreach { loc => hiveTable.getTTable.getSd.setLocation(loc)}
table.storage.inputFormat.map(toInputFormat).foreach(hiveTable.setInputFormatClass)
table.storage.outputFormat.map(toOutputFormat).foreach(hiveTable.setOutputFormatClass)
hiveTable.setSerializationLib(
@@ -866,7 +873,11 @@ private[hive] class HiveClientImpl(
hiveTable
}
- private def toHivePartition(
+ /**
+ * Converts the native partition metadata representation format CatalogTablePartition to
+ * Hive's Partition.
+ */
+ def toHivePartition(
p: CatalogTablePartition,
ht: HiveTable): HivePartition = {
val tpart = new org.apache.hadoop.hive.metastore.api.Partition
@@ -891,7 +902,10 @@ private[hive] class HiveClientImpl(
new HivePartition(ht, tpart)
}
- private def fromHivePartition(hp: HivePartition): CatalogTablePartition = {
+ /**
+ * Build the native partition metadata from Hive's Partition.
+ */
+ def fromHivePartition(hp: HivePartition): CatalogTablePartition = {
val apiPartition = hp.getTPartition
CatalogTablePartition(
spec = Option(hp.getSpec).map(_.asScala.toMap).getOrElse(Map.empty),
http://git-wip-us.apache.org/repos/asf/spark/blob/3881f342/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
index def6ef3..140c352 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
@@ -113,7 +113,7 @@ case class HiveTableScanExec(
.mkString(",")
hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames)
- hiveConf.set(serdeConstants.LIST_COLUMNS, relation.attributes.map(_.name).mkString(","))
+ hiveConf.set(serdeConstants.LIST_COLUMNS, relation.dataColKeys.map(_.name).mkString(","))
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/3881f342/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala
index 00fdfbc..ee632d2 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala
@@ -134,6 +134,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
storage = CatalogStorageFormat.empty.copy(
properties = Map("path" -> defaultTableURI("tbl4").toString)),
schema = new StructType(),
+ provider = Some("json"),
properties = Map(
"spark.sql.sources.provider" -> "json",
"spark.sql.sources.schema.numParts" -> "1",
@@ -145,6 +146,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
storage = CatalogStorageFormat.empty.copy(
properties = Map("path" -> defaultTableURI("tbl5").toString)),
schema = simpleSchema,
+ provider = Some("parquet"),
properties = Map(
"spark.sql.sources.provider" -> "parquet",
"spark.sql.sources.schema.numParts" -> "1",
@@ -156,6 +158,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
storage = CatalogStorageFormat.empty.copy(
properties = Map("path" -> defaultTableURI("tbl6").toString)),
schema = new StructType(),
+ provider = Some("json"),
properties = Map(
"spark.sql.sources.provider" -> "json",
"spark.sql.sources.schema.numParts" -> "1",
@@ -170,6 +173,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
locationUri = Some(defaultTableURI("tbl7").toString + "-__PLACEHOLDER__"),
properties = Map("path" -> tempDirUri)),
schema = new StructType(),
+ provider = Some("json"),
properties = Map(
"spark.sql.sources.provider" -> "json",
"spark.sql.sources.schema.numParts" -> "1",
@@ -194,6 +198,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
locationUri = Some(defaultTableURI("tbl9").toString + "-__PLACEHOLDER__"),
properties = Map("path" -> tempDirUri)),
schema = new StructType(),
+ provider = Some("json"),
properties = Map("spark.sql.sources.provider" -> "json"))
// A list of all raw tables we want to test, with their expected schema.
http://git-wip-us.apache.org/repos/asf/spark/blob/3881f342/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index cf1fe2b..e951bbe 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -748,6 +748,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
identifier = TableIdentifier(tableName, Some("default")),
tableType = CatalogTableType.MANAGED,
schema = new StructType,
+ provider = Some("json"),
storage = CatalogStorageFormat(
locationUri = None,
inputFormat = None,
@@ -1276,6 +1277,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
identifier = TableIdentifier("t", Some("default")),
tableType = CatalogTableType.MANAGED,
schema = new StructType,
+ provider = Some("json"),
storage = CatalogStorageFormat.empty,
properties = Map(
DATASOURCE_PROVIDER -> "json",
@@ -1373,6 +1375,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
properties = Map("path" -> path.getAbsolutePath)
),
schema = new StructType(),
+ provider = Some("parquet"),
properties = Map(
HiveExternalCatalog.DATASOURCE_PROVIDER -> "parquet"))
hiveClient.createTable(tableDesc, ignoreIfExists = false)
http://git-wip-us.apache.org/repos/asf/spark/blob/3881f342/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index ca39c7e..fe14824 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.client
import java.io.{ByteArrayOutputStream, File, PrintStream}
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.hadoop.mapred.TextInputFormat
@@ -570,7 +571,6 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w
}
}
-
test(s"$version: SPARK-13709: reading partitioned Avro table with nested schema") {
withTempDir { dir =>
val path = dir.toURI.toString
@@ -649,6 +649,29 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w
}
}
+ test(s"$version: CTAS for managed data source tables") {
+ withTable("t", "t1") {
+ import spark.implicits._
+
+ val tPath = new Path(spark.sessionState.conf.warehousePath, "t")
+ Seq("1").toDF("a").write.saveAsTable("t")
+ val expectedPath = s"file:${tPath.toUri.getPath.stripSuffix("/")}"
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+
+ assert(table.location.stripSuffix("/") == expectedPath)
+ assert(tPath.getFileSystem(spark.sessionState.newHadoopConf()).exists(tPath))
+ checkAnswer(spark.table("t"), Row("1") :: Nil)
+
+ val t1Path = new Path(spark.sessionState.conf.warehousePath, "t1")
+ spark.sql("create table t1 using parquet as select 2 as a")
+ val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
+ val expectedPath1 = s"file:${t1Path.toUri.getPath.stripSuffix("/")}"
+
+ assert(table1.location.stripSuffix("/") == expectedPath1)
+ assert(t1Path.getFileSystem(spark.sessionState.newHadoopConf()).exists(t1Path))
+ checkAnswer(spark.table("t1"), Row(2) :: Nil)
+ }
+ }
// TODO: add more tests.
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org