You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2016/10/14 20:23:11 UTC

spark git commit: [SPARK-17620][SQL] Determine Serde by hive.default.fileformat when Creating Hive Serde Tables

Repository: spark
Updated Branches:
  refs/heads/master de1c1ca5c -> 7ab86244e


[SPARK-17620][SQL] Determine Serde by hive.default.fileformat when Creating Hive Serde Tables

## What changes were proposed in this pull request?
Make sure the hive.default.fileformat is used to when creating the storage format metadata.

Output
``` SQL
scala> spark.sql("SET hive.default.fileformat=orc")
res1: org.apache.spark.sql.DataFrame = [key: string, value: string]

scala> spark.sql("CREATE TABLE tmp_default(id INT)")
res2: org.apache.spark.sql.DataFrame = []
```
Before
```SQL
scala> spark.sql("DESC FORMATTED tmp_default").collect.foreach(println)
..
[# Storage Information,,]
[SerDe Library:,org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,]
[InputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcInputFormat,]
[OutputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat,]
[Compressed:,No,]
[Storage Desc Parameters:,,]
[  serialization.format,1,]
```
After
```SQL
scala> spark.sql("DESC FORMATTED tmp_default").collect.foreach(println)
..
[# Storage Information,,]
[SerDe Library:,org.apache.hadoop.hive.ql.io.orc.OrcSerde,]
[InputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcInputFormat,]
[OutputFormat:,org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat,]
[Compressed:,No,]
[Storage Desc Parameters:,,]
[  serialization.format,1,]

```

## How was this patch tested?
Added new tests to HiveDDLCommandSuite

Author: Dilip Biswal <db...@us.ibm.com>

Closes #15190 from dilipbiswal/orc.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7ab86244
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7ab86244
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7ab86244

Branch: refs/heads/master
Commit: 7ab86244e30ca81eb4fa40ea77b4c2b8881cbab2
Parents: de1c1ca
Author: Dilip Biswal <db...@us.ibm.com>
Authored: Fri Oct 14 13:22:59 2016 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Fri Oct 14 13:22:59 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/SparkSqlParser.scala    |  4 +-
 .../spark/sql/hive/HiveDDLCommandSuite.scala    | 26 ++++++++++++-
 .../sql/hive/execution/SQLQuerySuite.scala      | 39 ++++++++++++++++++--
 3 files changed, 60 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7ab86244/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index be2eddb..8c68d1e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -1010,9 +1010,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
           .orElse(Some("org.apache.hadoop.mapred.TextInputFormat")),
         outputFormat = defaultHiveSerde.flatMap(_.outputFormat)
           .orElse(Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
-        // Note: Keep this unspecified because we use the presence of the serde to decide
-        // whether to convert a table created by CTAS to a datasource table.
-        serde = None,
+        serde = defaultHiveSerde.flatMap(_.serde),
         compressed = false,
         properties = Map())
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/7ab86244/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index 9ce3338..8133749 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -30,10 +30,12 @@ import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformation}
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources.CreateTable
-import org.apache.spark.sql.hive.test.TestHive
+import org.apache.spark.sql.hive.test.{TestHive, TestHiveSingleton}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.sql.types.StructType
 
-class HiveDDLCommandSuite extends PlanTest {
+class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingleton {
   val parser = TestHive.sessionState.sqlParser
 
   private def extractTableDesc(sql: String): (CatalogTable, Boolean) = {
@@ -556,4 +558,24 @@ class HiveDDLCommandSuite extends PlanTest {
     assert(partition2.get.apply("c") == "1" && partition2.get.apply("d") == "2")
   }
 
+  test("Test the default fileformat for Hive-serde tables") {
+    withSQLConf("hive.default.fileformat" -> "orc") {
+      val (desc, exists) = extractTableDesc("CREATE TABLE IF NOT EXISTS fileformat_test (id int)")
+      assert(exists)
+      assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"))
+      assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"))
+      assert(desc.storage.serde == Some("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
+    }
+
+    withSQLConf("hive.default.fileformat" -> "parquet") {
+      val (desc, exists) = extractTableDesc("CREATE TABLE IF NOT EXISTS fileformat_test (id int)")
+      assert(exists)
+      val input = desc.storage.inputFormat
+      val output = desc.storage.outputFormat
+      val serde = desc.storage.serde
+      assert(input == Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"))
+      assert(output == Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"))
+      assert(serde == Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
+    }
+   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7ab86244/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 6f2a166..5798f47 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -492,7 +492,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
 
   def checkRelation(
       tableName: String,
-      isDataSourceParquet: Boolean,
+      isDataSourceTable: Boolean,
       format: String,
       userSpecifiedLocation: Option[String] = None): Unit = {
     val relation = EliminateSubqueryAliases(
@@ -501,7 +501,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
       sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
     relation match {
       case LogicalRelation(r: HadoopFsRelation, _, _) =>
-        if (!isDataSourceParquet) {
+        if (!isDataSourceTable) {
           fail(
             s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " +
               s"${HadoopFsRelation.getClass.getCanonicalName}.")
@@ -514,7 +514,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
         assert(catalogTable.provider.get === format)
 
       case r: MetastoreRelation =>
-        if (isDataSourceParquet) {
+        if (isDataSourceTable) {
           fail(
             s"${HadoopFsRelation.getClass.getCanonicalName} is expected, but found " +
               s"${classOf[MetastoreRelation].getCanonicalName}.")
@@ -524,8 +524,15 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
             assert(r.catalogTable.storage.locationUri.get === location)
           case None => // OK.
         }
-        // Also make sure that the format is the desired format.
+        // Also make sure that the format and serde are as desired.
         assert(catalogTable.storage.inputFormat.get.toLowerCase.contains(format))
+        assert(catalogTable.storage.outputFormat.get.toLowerCase.contains(format))
+        val serde = catalogTable.storage.serde.get
+        format match {
+          case "sequence" | "text" => assert(serde.contains("LazySimpleSerDe"))
+          case "rcfile" => assert(serde.contains("LazyBinaryColumnarSerDe"))
+          case _ => assert(serde.toLowerCase.contains(format))
+        }
     }
 
     // When a user-specified location is defined, the table type needs to be EXTERNAL.
@@ -587,6 +594,30 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     }
   }
 
+  test("CTAS with default fileformat") {
+    val table = "ctas1"
+    val ctas = s"CREATE TABLE IF NOT EXISTS $table SELECT key k, value FROM src"
+    withSQLConf(SQLConf.CONVERT_CTAS.key -> "true") {
+      withSQLConf("hive.default.fileformat" -> "textfile") {
+        withTable(table) {
+          sql(ctas)
+          // We should use parquet here as that is the default datasource fileformat. The default
+          // datasource file format is controlled by `spark.sql.sources.default` configuration.
+          // This testcase verifies that setting `hive.default.fileformat` has no impact on
+          // the target table's fileformat in case of CTAS.
+          assert(sessionState.conf.defaultDataSourceName === "parquet")
+          checkRelation(table, isDataSourceTable = true, "parquet")
+        }
+      }
+      withSQLConf("spark.sql.sources.default" -> "orc") {
+        withTable(table) {
+          sql(ctas)
+          checkRelation(table, isDataSourceTable = true, "orc")
+         }
+      }
+    }
+  }
+
   test("CTAS without serde with location") {
     withSQLConf(SQLConf.CONVERT_CTAS.key -> "true") {
       withTempDir { dir =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org