You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2015/10/27 05:14:30 UTC

spark git commit: [SPARK-10562] [SQL] support mixed case partitionBy column names for tables stored in metastore

Repository: spark
Updated Branches:
  refs/heads/master dc3220ce1 -> a150e6c1b


[SPARK-10562] [SQL] support mixed case partitionBy column names for tables stored in metastore

https://issues.apache.org/jira/browse/SPARK-10562

Author: Wenchen Fan <we...@databricks.com>

Closes #9226 from cloud-fan/par.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a150e6c1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a150e6c1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a150e6c1

Branch: refs/heads/master
Commit: a150e6c1b03b64a35855b8074b2fe077a6081a34
Parents: dc3220c
Author: Wenchen Fan <we...@databricks.com>
Authored: Mon Oct 26 21:14:26 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Mon Oct 26 21:14:26 2015 -0700

----------------------------------------------------------------------
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 61 ++++++++++++--------
 .../sql/hive/MetastoreDataSourcesSuite.scala    |  9 ++-
 .../sql/hive/execution/SQLQuerySuite.scala      | 11 ++++
 3 files changed, 54 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a150e6c1/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index fdb576b..f4d4571 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -143,6 +143,21 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
           }
         }
 
+        def partColsFromParts: Option[Seq[String]] = {
+          table.properties.get("spark.sql.sources.schema.numPartCols").map { numPartCols =>
+            (0 until numPartCols.toInt).map { index =>
+              val partCol = table.properties.get(s"spark.sql.sources.schema.partCol.$index").orNull
+              if (partCol == null) {
+                throw new AnalysisException(
+                  "Could not read partitioned columns from the metastore because it is corrupted " +
+                    s"(missing part $index of the it, $numPartCols parts are expected).")
+              }
+
+              partCol
+            }
+          }
+        }
+
         // Originally, we used spark.sql.sources.schema to store the schema of a data source table.
         // After SPARK-6024, we removed this flag.
         // Although we are not using spark.sql.sources.schema any more, we need to still support.
@@ -155,7 +170,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
         // We only need names at here since userSpecifiedSchema we loaded from the metastore
         // contains partition columns. We can always get datatypes of partitioning columns
         // from userSpecifiedSchema.
-        val partitionColumns = table.partitionColumns.map(_.name)
+        val partitionColumns = partColsFromParts.getOrElse(Nil)
 
         // It does not appear that the ql client for the metastore has a way to enumerate all the
         // SerDe properties directly...
@@ -218,25 +233,21 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
       }
     }
 
-    val metastorePartitionColumns = userSpecifiedSchema.map { schema =>
-      val fields = partitionColumns.map(col => schema(col))
-      fields.map { field =>
-        HiveColumn(
-          name = field.name,
-          hiveType = HiveMetastoreTypes.toMetastoreType(field.dataType),
-          comment = "")
-      }.toSeq
-    }.getOrElse {
-      if (partitionColumns.length > 0) {
-        // The table does not have a specified schema, which means that the schema will be inferred
-        // when we load the table. So, we are not expecting partition columns and we will discover
-        // partitions when we load the table. However, if there are specified partition columns,
-        // we simply ignore them and provide a warning message.
-        logWarning(
-          s"The schema and partitions of table $tableIdent will be inferred when it is loaded. " +
-            s"Specified partition columns (${partitionColumns.mkString(",")}) will be ignored.")
+    if (userSpecifiedSchema.isDefined && partitionColumns.length > 0) {
+      tableProperties.put("spark.sql.sources.schema.numPartCols", partitionColumns.length.toString)
+      partitionColumns.zipWithIndex.foreach { case (partCol, index) =>
+        tableProperties.put(s"spark.sql.sources.schema.partCol.$index", partCol)
       }
-      Seq.empty[HiveColumn]
+    }
+
+    if (userSpecifiedSchema.isEmpty && partitionColumns.length > 0) {
+      // The table does not have a specified schema, which means that the schema will be inferred
+      // when we load the table. So, we are not expecting partition columns and we will discover
+      // partitions when we load the table. However, if there are specified partition columns,
+      // we simply ignore them and provide a warning message.
+      logWarning(
+        s"The schema and partitions of table $tableIdent will be inferred when it is loaded. " +
+          s"Specified partition columns (${partitionColumns.mkString(",")}) will be ignored.")
     }
 
     val tableType = if (isExternal) {
@@ -255,8 +266,8 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
       HiveTable(
         specifiedDatabase = Option(dbName),
         name = tblName,
-        schema = Seq.empty,
-        partitionColumns = metastorePartitionColumns,
+        schema = Nil,
+        partitionColumns = Nil,
         tableType = tableType,
         properties = tableProperties.toMap,
         serdeProperties = options)
@@ -272,14 +283,14 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
         }
       }
 
-      val partitionColumns = schemaToHiveColumn(relation.partitionColumns)
-      val dataColumns = schemaToHiveColumn(relation.schema).filterNot(partitionColumns.contains)
+      assert(partitionColumns.isEmpty)
+      assert(relation.partitionColumns.isEmpty)
 
       HiveTable(
         specifiedDatabase = Option(dbName),
         name = tblName,
-        schema = dataColumns,
-        partitionColumns = partitionColumns,
+        schema = schemaToHiveColumn(relation.schema),
+        partitionColumns = Nil,
         tableType = tableType,
         properties = tableProperties.toMap,
         serdeProperties = options,

http://git-wip-us.apache.org/repos/asf/spark/blob/a150e6c1/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index d292887..f74eb15 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -753,10 +753,15 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
       invalidateTable(tableName)
       val metastoreTable = catalog.client.getTable("default", tableName)
       val expectedPartitionColumns = StructType(df.schema("d") :: df.schema("b") :: Nil)
+
+      val numPartCols = metastoreTable.properties("spark.sql.sources.schema.numPartCols").toInt
+      assert(numPartCols == 2)
+
       val actualPartitionColumns =
         StructType(
-          metastoreTable.partitionColumns.map(c =>
-            StructField(c.name, HiveMetastoreTypes.toDataType(c.hiveType))))
+          (0 until numPartCols).map { index =>
+            df.schema(metastoreTable.properties(s"spark.sql.sources.schema.partCol.$index"))
+          })
       // Make sure partition columns are correctly stored in metastore.
       assert(
         expectedPartitionColumns.sameType(actualPartitionColumns),

http://git-wip-us.apache.org/repos/asf/spark/blob/a150e6c1/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 396150b..fd38064 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -1410,4 +1410,15 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
       }
     }
   }
+
+  test("SPARK-10562: partition by column with mixed case name") {
+    withTable("tbl10562") {
+      val df = Seq(2012 -> "a").toDF("Year", "val")
+      df.write.partitionBy("Year").saveAsTable("tbl10562")
+      checkAnswer(sql("SELECT Year FROM tbl10562"), Row(2012))
+      checkAnswer(sql("SELECT yEAr FROM tbl10562"), Row(2012))
+      checkAnswer(sql("SELECT val FROM tbl10562 WHERE Year > 2015"), Nil)
+      checkAnswer(sql("SELECT val FROM tbl10562 WHERE Year == 2012"), Row("a"))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org