You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/02/10 19:07:05 UTC

spark git commit: [SPARK-19459][SQL] Add Hive datatype (char/varchar) to StructField metadata

Repository: spark
Updated Branches:
  refs/heads/master dadff5f07 -> de8a03e68


[SPARK-19459][SQL] Add Hive datatype (char/varchar) to StructField metadata

## What changes were proposed in this pull request?
Reading from an existing ORC table which contains `char` or `varchar` columns can fail with a `ClassCastException` if the table metadata has been created using Spark. This is caused by the fact that spark internally replaces `char` and `varchar` columns with a `string` column.

This PR fixes this by adding the hive type to the `StructField's` metadata under the `HIVE_TYPE_STRING` key. This is picked up by the `HiveClient` and the ORC reader, see https://github.com/apache/spark/pull/16060 for more details on how the metadata is used.

## How was this patch tested?
Added a regression test to `OrcSourceSuite`.

Author: Herman van Hovell <hv...@databricks.com>

Closes #16804 from hvanhovell/SPARK-19459.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/de8a03e6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/de8a03e6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/de8a03e6

Branch: refs/heads/master
Commit: de8a03e68202647555e30fffba551f65bc77608d
Parents: dadff5f
Author: Herman van Hovell <hv...@databricks.com>
Authored: Fri Feb 10 11:06:57 2017 -0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Feb 10 11:06:57 2017 -0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/parser/AstBuilder.scala  | 24 +++++++++++--
 .../org/apache/spark/sql/types/package.scala    | 10 +++++-
 .../spark/sql/sources/TableScanSuite.scala      |  8 +++--
 .../org/apache/spark/sql/hive/HiveUtils.scala   | 14 ++------
 .../spark/sql/hive/client/HiveClientImpl.scala  |  8 ++---
 .../spark/sql/hive/orc/OrcSourceSuite.scala     | 37 +++++++++++++++++---
 6 files changed, 76 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/de8a03e6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 3969fdb..bb07558 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -1457,8 +1457,28 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
    */
   override def visitColType(ctx: ColTypeContext): StructField = withOrigin(ctx) {
     import ctx._
-    val structField = StructField(identifier.getText, typedVisit(dataType), nullable = true)
-    if (STRING == null) structField else structField.withComment(string(STRING))
+
+    val builder = new MetadataBuilder
+    // Add comment to metadata
+    if (STRING != null) {
+      builder.putString("comment", string(STRING))
+    }
+    // Add Hive type string to metadata.
+    dataType match {
+      case p: PrimitiveDataTypeContext =>
+        p.identifier.getText.toLowerCase match {
+          case "varchar" | "char" =>
+            builder.putString(HIVE_TYPE_STRING, dataType.getText.toLowerCase)
+          case _ =>
+        }
+      case _ =>
+    }
+
+    StructField(
+      identifier.getText,
+      typedVisit(dataType),
+      nullable = true,
+      builder.build())
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/de8a03e6/sql/catalyst/src/main/scala/org/apache/spark/sql/types/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/package.scala
index 346a51e..f29cbc2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/package.scala
@@ -21,4 +21,12 @@ package org.apache.spark.sql
  * Contains a type system for attributes produced by relations, including complex types like
  * structs, arrays and maps.
  */
-package object types
+package object types {
+  /**
+   * Metadata key used to store the raw hive type string in the metadata of StructField. This
+   * is relevant for datatypes that do not have a direct Spark SQL counterpart, such as CHAR and
+   * VARCHAR. We need to preserve the original type in order to invoke the correct object
+   * inspector in Hive.
+   */
+  val HIVE_TYPE_STRING = "HIVE_TYPE_STRING"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/de8a03e6/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
index 86bcb4d..b01d15e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
@@ -203,6 +203,10 @@ class TableScanSuite extends DataSourceTest with SharedSQLContext {
     (2 to 10).map(i => Row(i, i - 1)).toSeq)
 
   test("Schema and all fields") {
+    def hiveMetadata(dt: String): Metadata = {
+      new MetadataBuilder().putString(HIVE_TYPE_STRING, dt).build()
+    }
+
     val expectedSchema = StructType(
       StructField("string$%Field", StringType, true) ::
       StructField("binaryField", BinaryType, true) ::
@@ -217,8 +221,8 @@ class TableScanSuite extends DataSourceTest with SharedSQLContext {
       StructField("decimalField2", DecimalType(9, 2), true) ::
       StructField("dateField", DateType, true) ::
       StructField("timestampField", TimestampType, true) ::
-      StructField("varcharField", StringType, true) ::
-      StructField("charField", StringType, true) ::
+      StructField("varcharField", StringType, true, hiveMetadata("varchar(12)")) ::
+      StructField("charField", StringType, true, hiveMetadata("char(18)")) ::
       StructField("arrayFieldSimple", ArrayType(IntegerType), true) ::
       StructField("arrayFieldComplex",
         ArrayType(

http://git-wip-us.apache.org/repos/asf/spark/blob/de8a03e6/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index 312ec67..13ab4e8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -61,14 +61,6 @@ private[spark] object HiveUtils extends Logging {
   /** The version of hive used internally by Spark SQL. */
   val hiveExecutionVersion: String = "1.2.1"
 
-  /**
-   * The property key that is used to store the raw hive type string in the metadata of StructField.
-   * For example, in the case where the Hive type is varchar, the type gets mapped to a string type
-   * in Spark SQL, but we need to preserve the original type in order to invoke the correct object
-   * inspector in Hive.
-   */
-  val hiveTypeString: String = "HIVE_TYPE_STRING"
-
   val HIVE_METASTORE_VERSION = buildConf("spark.sql.hive.metastore.version")
     .doc("Version of the Hive metastore. Available options are " +
         s"<code>0.12.0</code> through <code>$hiveExecutionVersion</code>.")
@@ -465,8 +457,8 @@ private[spark] object HiveUtils extends Logging {
 
   /** Converts the native StructField to Hive's FieldSchema. */
   private def toHiveColumn(c: StructField): FieldSchema = {
-    val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) {
-      c.metadata.getString(HiveUtils.hiveTypeString)
+    val typeString = if (c.metadata.contains(HIVE_TYPE_STRING)) {
+      c.metadata.getString(HIVE_TYPE_STRING)
     } else {
       c.dataType.catalogString
     }
@@ -482,7 +474,7 @@ private[spark] object HiveUtils extends Logging {
         throw new SparkException("Cannot recognize hive type string: " + hc.getType, e)
     }
 
-    val metadata = new MetadataBuilder().putString(HiveUtils.hiveTypeString, hc.getType).build()
+    val metadata = new MetadataBuilder().putString(HIVE_TYPE_STRING, hc.getType).build()
     val field = StructField(
       name = hc.getName,
       dataType = columnType,

http://git-wip-us.apache.org/repos/asf/spark/blob/de8a03e6/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index bf703a5..f0d01eb 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -47,7 +47,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
 import org.apache.spark.sql.execution.QueryExecutionException
 import org.apache.spark.sql.hive.HiveUtils
-import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
+import org.apache.spark.sql.types._
 import org.apache.spark.util.{CircularBuffer, Utils}
 
 /**
@@ -790,8 +790,8 @@ private[hive] class HiveClientImpl(
       .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
 
   private def toHiveColumn(c: StructField): FieldSchema = {
-    val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) {
-      c.metadata.getString(HiveUtils.hiveTypeString)
+    val typeString = if (c.metadata.contains(HIVE_TYPE_STRING)) {
+      c.metadata.getString(HIVE_TYPE_STRING)
     } else {
       c.dataType.catalogString
     }
@@ -806,7 +806,7 @@ private[hive] class HiveClientImpl(
         throw new SparkException("Cannot recognize hive type string: " + hc.getType, e)
     }
 
-    val metadata = new MetadataBuilder().putString(HiveUtils.hiveTypeString, hc.getType).build()
+    val metadata = new MetadataBuilder().putString(HIVE_TYPE_STRING, hc.getType).build()
     val field = StructField(
       name = hc.getName,
       dataType = columnType,

http://git-wip-us.apache.org/repos/asf/spark/blob/de8a03e6/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
index fe1e17d..59ea891 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
@@ -152,14 +152,41 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA
     assert(new OrcOptions(Map("Orc.Compress" -> "NONE")).compressionCodec == "NONE")
   }
 
-  test("SPARK-18220: read Hive orc table with varchar column") {
+  test("SPARK-19459/SPARK-18220: read char/varchar column written by Hive") {
     val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+    val location = Utils.createTempDir()
+    val uri = location.toURI
     try {
-      hiveClient.runSqlHive("CREATE TABLE orc_varchar(a VARCHAR(10)) STORED AS orc")
-      hiveClient.runSqlHive("INSERT INTO TABLE orc_varchar SELECT 'a' FROM (SELECT 1) t")
-      checkAnswer(spark.table("orc_varchar"), Row("a"))
+      hiveClient.runSqlHive(
+        """
+           |CREATE EXTERNAL TABLE hive_orc(
+           |  a STRING,
+           |  b CHAR(10),
+           |  c VARCHAR(10))
+           |STORED AS orc""".stripMargin)
+      // Hive throws an exception if I assign the location in the create table statement.
+      hiveClient.runSqlHive(
+        s"ALTER TABLE hive_orc SET LOCATION '$uri'")
+      hiveClient.runSqlHive(
+        "INSERT INTO TABLE hive_orc SELECT 'a', 'b', 'c' FROM (SELECT 1) t")
+
+      // We create a different table in Spark using the same schema which points to
+      // the same location.
+      spark.sql(
+        s"""
+           |CREATE EXTERNAL TABLE spark_orc(
+           |  a STRING,
+           |  b CHAR(10),
+           |  c VARCHAR(10))
+           |STORED AS orc
+           |LOCATION '$uri'""".stripMargin)
+      val result = Row("a", "b         ", "c")
+      checkAnswer(spark.table("hive_orc"), result)
+      checkAnswer(spark.table("spark_orc"), result)
     } finally {
-      hiveClient.runSqlHive("DROP TABLE IF EXISTS orc_varchar")
+      hiveClient.runSqlHive("DROP TABLE IF EXISTS hive_orc")
+      hiveClient.runSqlHive("DROP TABLE IF EXISTS spark_orc")
+      Utils.deleteRecursively(location)
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org