You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2016/12/02 04:54:19 UTC

spark git commit: [SPARK-18647][SQL] do not put provider in table properties for Hive serde table

Repository: spark
Updated Branches:
  refs/heads/master 38b9e6962 -> a5f02b002


[SPARK-18647][SQL] do not put provider in table properties for Hive serde table

## What changes were proposed in this pull request?

In Spark 2.1, we make Hive serde tables case-preserving by putting the table metadata in table properties, like what we did for data source table. However, we should not put table provider, as it will break forward compatibility. e.g. if we create a Hive serde table with Spark 2.1, using `sql("create table test stored as parquet as select 1")`, we will fail to read it with Spark 2.0, as Spark 2.0 mistakenly treat it as data source table because there is a `provider` entry in table properties.

Logically Hive serde table's provider is always hive, we don't need to store it in table properties, this PR removes it.

## How was this patch tested?

manually test the forward compatibility issue.

Author: Wenchen Fan <we...@databricks.com>

Closes #16080 from cloud-fan/hive.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a5f02b00
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a5f02b00
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a5f02b00

Branch: refs/heads/master
Commit: a5f02b00291e0a22429a3dca81f12cf6d38fea0b
Parents: 38b9e69
Author: Wenchen Fan <we...@databricks.com>
Authored: Fri Dec 2 12:54:12 2016 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Dec 2 12:54:12 2016 +0800

----------------------------------------------------------------------
 .../spark/sql/hive/HiveExternalCatalog.scala    | 80 ++++++++++----------
 .../sql/hive/HiveExternalCatalogSuite.scala     | 18 +++++
 .../sql/hive/HiveMetastoreCatalogSuite.scala    |  2 -
 3 files changed, 59 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a5f02b00/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 1a9943b..0658832 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -21,6 +21,7 @@ import java.io.IOException
 import java.net.URI
 import java.util
 
+import scala.collection.mutable
 import scala.util.control.NonFatal
 
 import org.apache.hadoop.conf.Configuration
@@ -219,9 +220,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
           // table location for tables in default database, while we expect to use the location of
           // default database.
           storage = tableDefinition.storage.copy(locationUri = tableLocation),
-          // Here we follow data source tables and put table metadata like provider, schema, etc. in
-          // table properties, so that we can work around the Hive metastore issue about not case
-          // preserving and make Hive serde table support mixed-case column names.
+          // Here we follow data source tables and put table metadata like table schema, partition
+          // columns etc. in table properties, so that we can work around the Hive metastore issue
+          // about not case preserving and make Hive serde table support mixed-case column names.
           properties = tableDefinition.properties ++ tableMetaToTableProps(tableDefinition))
         client.createTable(tableWithDataSourceProps, ignoreIfExists)
       } else {
@@ -233,10 +234,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
   }
 
   private def createDataSourceTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = {
+    // data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`.
+    val provider = table.provider.get
+
     // To work around some hive metastore issues, e.g. not case-preserving, bad decimal type
     // support, no column nullability, etc., we should do some extra works before saving table
     // metadata into Hive metastore:
-    //  1. Put table metadata like provider, schema, etc. in table properties.
+    //  1. Put table metadata like table schema, partition columns, etc. in table properties.
     //  2. Check if this table is hive compatible.
     //    2.1  If it's not hive compatible, set location URI, schema, partition columns and bucket
     //         spec to empty and save table metadata to Hive.
@@ -244,6 +248,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     //         it to Hive. If it fails, treat it as not hive compatible and go back to 2.1
     val tableProperties = tableMetaToTableProps(table)
 
+    // put table provider and partition provider in table properties.
+    tableProperties.put(DATASOURCE_PROVIDER, provider)
+    if (table.tracksPartitionsInCatalog) {
+      tableProperties.put(TABLE_PARTITION_PROVIDER, TABLE_PARTITION_PROVIDER_CATALOG)
+    }
+
     // Ideally we should also put `locationUri` in table properties like provider, schema, etc.
     // However, in older version of Spark we already store table location in storage properties
     // with key "path". Here we keep this behaviour for backward compatibility.
@@ -290,7 +300,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     }
 
     val qualifiedTableName = table.identifier.quotedString
-    val maybeSerde = HiveSerDe.sourceToSerDe(table.provider.get)
+    val maybeSerde = HiveSerDe.sourceToSerDe(provider)
     val skipHiveMetadata = table.storage.properties
       .getOrElse("skipHiveMetadata", "false").toBoolean
 
@@ -315,7 +325,6 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
         (Some(newHiveCompatibleMetastoreTable(serde)), message)
 
       case _ =>
-        val provider = table.provider.get
         val message =
           s"Couldn't find corresponding Hive SerDe for data source provider $provider. " +
             s"Persisting data source table $qualifiedTableName into Hive metastore in " +
@@ -349,21 +358,14 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
   /**
    * Data source tables may be non Hive compatible and we need to store table metadata in table
    * properties to workaround some Hive metastore limitations.
-   * This method puts table provider, partition provider, schema, partition column names, bucket
-   * specification into a map, which can be used as table properties later.
+   * This method puts table schema, partition column names, bucket specification into a map, which
+   * can be used as table properties later.
    */
-  private def tableMetaToTableProps(table: CatalogTable): scala.collection.Map[String, String] = {
-    // data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`.
-    val provider = table.provider.get
+  private def tableMetaToTableProps(table: CatalogTable): mutable.Map[String, String] = {
     val partitionColumns = table.partitionColumnNames
     val bucketSpec = table.bucketSpec
 
-    val properties = new scala.collection.mutable.HashMap[String, String]
-    properties.put(DATASOURCE_PROVIDER, provider)
-    if (table.tracksPartitionsInCatalog) {
-      properties.put(TABLE_PARTITION_PROVIDER, TABLE_PARTITION_PROVIDER_CATALOG)
-    }
-
+    val properties = new mutable.HashMap[String, String]
     // Serialized JSON schema string may be too long to be stored into a single metastore table
     // property. In this case, we split the JSON string and store each part as a separate table
     // property.
@@ -617,14 +619,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
 
     if (table.tableType != VIEW) {
       table.properties.get(DATASOURCE_PROVIDER) match {
-        // No provider in table properties, which means this table is created by Spark prior to 2.1,
-        // or is created at Hive side.
+        // No provider in table properties, which means this is a Hive serde table.
         case None =>
-          table = table.copy(
-            provider = Some(DDLUtils.HIVE_PROVIDER), tracksPartitionsInCatalog = true)
-
-        // This is a Hive serde table created by Spark 2.1 or higher versions.
-        case Some(DDLUtils.HIVE_PROVIDER) =>
           table = restoreHiveSerdeTable(table)
 
         // This is a regular data source table.
@@ -637,7 +633,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     val statsProps = table.properties.filterKeys(_.startsWith(STATISTICS_PREFIX))
 
     if (statsProps.nonEmpty) {
-      val colStats = new scala.collection.mutable.HashMap[String, ColumnStat]
+      val colStats = new mutable.HashMap[String, ColumnStat]
 
       // For each column, recover its column stats. Note that this is currently a O(n^2) operation,
       // but given the number of columns it usually not enormous, this is probably OK as a start.
@@ -674,21 +670,27 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       provider = Some(DDLUtils.HIVE_PROVIDER),
       tracksPartitionsInCatalog = true)
 
-    val schemaFromTableProps = getSchemaFromTableProperties(table)
-    if (DataType.equalsIgnoreCaseAndNullability(schemaFromTableProps, table.schema)) {
-      hiveTable.copy(
-        schema = schemaFromTableProps,
-        partitionColumnNames = getPartitionColumnsFromTableProperties(table),
-        bucketSpec = getBucketSpecFromTableProperties(table))
+    // If this is a Hive serde table created by Spark 2.1 or higher versions, we should restore its
+    // schema from table properties.
+    if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) {
+      val schemaFromTableProps = getSchemaFromTableProperties(table)
+      if (DataType.equalsIgnoreCaseAndNullability(schemaFromTableProps, table.schema)) {
+        hiveTable.copy(
+          schema = schemaFromTableProps,
+          partitionColumnNames = getPartitionColumnsFromTableProperties(table),
+          bucketSpec = getBucketSpecFromTableProperties(table))
+      } else {
+        // Hive metastore may change the table schema, e.g. schema inference. If the table
+        // schema we read back is different(ignore case and nullability) from the one in table
+        // properties which was written when creating table, we should respect the table schema
+        // from hive.
+        logWarning(s"The table schema given by Hive metastore(${table.schema.simpleString}) is " +
+          "different from the schema when this table was created by Spark SQL" +
+          s"(${schemaFromTableProps.simpleString}). We have to fall back to the table schema " +
+          "from Hive metastore which is not case preserving.")
+        hiveTable
+      }
     } else {
-      // Hive metastore may change the table schema, e.g. schema inference. If the table
-      // schema we read back is different(ignore case and nullability) from the one in table
-      // properties which was written when creating table, we should respect the table schema
-      // from hive.
-      logWarning(s"The table schema given by Hive metastore(${table.schema.simpleString}) is " +
-        "different from the schema when this table was created by Spark SQL" +
-        s"(${schemaFromTableProps.simpleString}). We have to fall back to the table schema from " +
-        "Hive metastore which is not case preserving.")
       hiveTable
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a5f02b00/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
index efa0beb..6fee458 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
@@ -20,8 +20,11 @@ package org.apache.spark.sql.hive
 import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark.SparkConf
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.types.StructType
 
 /**
  * Test suite for the [[HiveExternalCatalog]].
@@ -52,4 +55,19 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite {
     assert(selectedPartitions.length == 1)
     assert(selectedPartitions.head.spec == part1.spec)
   }
+
+  test("SPARK-18647: do not put provider in table properties for Hive serde table") {
+    val catalog = newBasicCatalog()
+    val hiveTable = CatalogTable(
+      identifier = TableIdentifier("hive_tbl", Some("db1")),
+      tableType = CatalogTableType.MANAGED,
+      storage = storageFormat,
+      schema = new StructType().add("col1", "int").add("col2", "string"),
+      provider = Some("hive"))
+    catalog.createTable(hiveTable, ignoreIfExists = false)
+
+    val rawTable = externalCatalog.client.getTable("db1", "hive_tbl")
+    assert(!rawTable.properties.contains(HiveExternalCatalog.DATASOURCE_PROVIDER))
+    assert(externalCatalog.getTable("db1", "hive_tbl").provider == Some(DDLUtils.HIVE_PROVIDER))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a5f02b00/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index 7abc4d9..0a280b4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.hive
 
-import java.io.File
-
 import org.apache.spark.sql.{QueryTest, Row, SaveMode}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTableType


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org