You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/02/08 15:12:01 UTC

spark git commit: [SPARK-19279][SQL][FOLLOW-UP] Infer Schema for Hive Serde Tables

Repository: spark
Updated Branches:
  refs/heads/master 0077bfcb9 -> 4d4d0de7f


[SPARK-19279][SQL][FOLLOW-UP] Infer Schema for Hive Serde Tables

### What changes were proposed in this pull request?
`table.schema` is always not empty for partitioned tables, because `table.schema` also contains the partitioned columns, even if the original table does not have any column. This PR is to fix the issue.

### How was this patch tested?
Added a test case

Author: gatorsmile <ga...@gmail.com>

Closes #16848 from gatorsmile/inferHiveSerdeSchema.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4d4d0de7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4d4d0de7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4d4d0de7

Branch: refs/heads/master
Commit: 4d4d0de7f64cefbca28dc532b7864de9626aa241
Parents: 0077bfc
Author: gatorsmile <ga...@gmail.com>
Authored: Wed Feb 8 10:11:44 2017 -0500
Committer: gatorsmile <ga...@gmail.com>
Committed: Wed Feb 8 10:11:44 2017 -0500

----------------------------------------------------------------------
 .../spark/sql/catalyst/catalog/interface.scala  |  8 ++++
 .../org/apache/spark/sql/hive/HiveUtils.scala   |  2 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala    | 44 ++++++++++++++++++++
 3 files changed, 53 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4d4d0de7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index a8fa78d..353e595 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -194,6 +194,14 @@ case class CatalogTable(
     StructType(partitionFields)
   }
 
+  /**
+   * schema of this table's data columns
+   */
+  def dataSchema: StructType = {
+    val dataFields = schema.dropRight(partitionColumnNames.length)
+    StructType(dataFields)
+  }
+
   /** Return the database this table was specified to belong to, assuming it exists. */
   def database: String = identifier.database.getOrElse {
     throw new AnalysisException(s"table $identifier did not specify database")

http://git-wip-us.apache.org/repos/asf/spark/blob/4d4d0de7/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index 30abc62..312ec67 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -580,7 +580,7 @@ private[spark] object HiveUtils extends Logging {
    * CatalogTable.
    */
   def inferSchema(table: CatalogTable): CatalogTable = {
-    if (DDLUtils.isDatasourceTable(table) || table.schema.nonEmpty) {
+    if (DDLUtils.isDatasourceTable(table) || table.dataSchema.nonEmpty) {
       table
     } else {
       val hiveTable = toHiveTable(table)

http://git-wip-us.apache.org/repos/asf/spark/blob/4d4d0de7/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index c262095..cf1fe2b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -27,6 +27,7 @@ import org.apache.spark.SparkContext
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.execution.command.CreateTableCommand
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
 import org.apache.spark.sql.hive.HiveExternalCatalog._
 import org.apache.spark.sql.hive.client.HiveClient
@@ -1308,6 +1309,49 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
     }
   }
 
+  test("Infer schema for Hive serde tables") {
+    val tableName = "tab1"
+    val avroSchema =
+      """{
+        |  "name": "test_record",
+        |  "type": "record",
+        |  "fields": [ {
+        |    "name": "f0",
+        |    "type": "int"
+        |  }]
+        |}
+      """.stripMargin
+
+    Seq(true, false).foreach { isPartitioned =>
+      withTable(tableName) {
+        val partitionClause = if (isPartitioned) "PARTITIONED BY (ds STRING)" else ""
+        // Creates the (non-)partitioned Avro table
+        val plan = sql(
+          s"""
+             |CREATE TABLE $tableName
+             |$partitionClause
+             |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
+             |STORED AS
+             |  INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
+             |  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
+             |TBLPROPERTIES ('avro.schema.literal' = '$avroSchema')
+           """.stripMargin
+        ).queryExecution.analyzed
+
+        assert(plan.isInstanceOf[CreateTableCommand] &&
+          plan.asInstanceOf[CreateTableCommand].table.dataSchema.nonEmpty)
+
+        if (isPartitioned) {
+          sql(s"INSERT OVERWRITE TABLE $tableName partition (ds='a') SELECT 1")
+          checkAnswer(spark.table(tableName), Row(1, "a"))
+        } else {
+          sql(s"INSERT OVERWRITE TABLE $tableName SELECT 1")
+          checkAnswer(spark.table(tableName), Row(1))
+        }
+      }
+    }
+  }
+
   private def withDebugMode(f: => Unit): Unit = {
     val previousValue = sparkSession.sparkContext.conf.get(DEBUG_MODE)
     try {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org