You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/02/18 00:48:18 UTC

spark git commit: [SPARK-5852][SQL]Fail to convert a newly created empty metastore parquet table to a data source parquet table.

Repository: spark
Updated Branches:
  refs/heads/master 4d4cc760f -> 117121a4e


[SPARK-5852][SQL]Fail to convert a newly created empty metastore parquet table to a data source parquet table.

The problem is that after we create an empty hive metastore parquet table (e.g. `CREATE TABLE test (a int) STORED AS PARQUET`), Hive will create an empty dir for us, which cause our data source `ParquetRelation2` fail to get the schema of the table. See JIRA for the case to reproduce the bug and the exception.

This PR is based on #4562 from chenghao-intel.

JIRA: https://issues.apache.org/jira/browse/SPARK-5852

Author: Yin Huai <yh...@databricks.com>
Author: Cheng Hao <ha...@intel.com>

Closes #4655 from yhuai/CTASParquet and squashes the following commits:

b8b3450 [Yin Huai] Update tests.
2ac94f7 [Yin Huai] Update tests.
3db3d20 [Yin Huai] Minor update.
d7e2308 [Yin Huai] Revert changes in HiveMetastoreCatalog.scala.
36978d1 [Cheng Hao] Update the code as feedback
a04930b [Cheng Hao] fix bug of scan an empty parquet based table
442ffe0 [Cheng Hao] passdown the schema for Parquet File in HiveContext


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/117121a4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/117121a4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/117121a4

Branch: refs/heads/master
Commit: 117121a4ecaadda156a82255333670775e7727db
Parents: 4d4cc76
Author: Yin Huai <yh...@databricks.com>
Authored: Tue Feb 17 15:47:59 2015 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Tue Feb 17 15:47:59 2015 -0800

----------------------------------------------------------------------
 .../apache/spark/sql/parquet/newParquet.scala   |  18 ++-
 .../sql/hive/MetastoreDataSourcesSuite.scala    |  38 +++++++
 .../spark/sql/parquet/parquetSuites.scala       | 114 ++++++++++++++++++-
 3 files changed, 164 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/117121a4/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 95bea92..16b7713 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -287,7 +287,16 @@ private[sql] case class ParquetRelation2(
         }
       }
 
-      parquetSchema = maybeSchema.getOrElse(readSchema())
+      // To get the schema. We first try to get the schema defined in maybeSchema.
+      // If maybeSchema is not defined, we will try to get the schema from existing parquet data
+      // (through readSchema). If data does not exist, we will try to get the schema defined in
+      // maybeMetastoreSchema (defined in the options of the data source).
+      // Finally, if we still could not get the schema. We throw an error.
+      parquetSchema =
+        maybeSchema
+          .orElse(readSchema())
+          .orElse(maybeMetastoreSchema)
+          .getOrElse(sys.error("Failed to get the schema."))
 
       partitionKeysIncludedInParquetSchema =
         isPartitioned &&
@@ -308,7 +317,7 @@ private[sql] case class ParquetRelation2(
       }
     }
 
-    private def readSchema(): StructType = {
+    private def readSchema(): Option[StructType] = {
       // Sees which file(s) we need to touch in order to figure out the schema.
       val filesToTouch =
       // Always tries the summary files first if users don't require a merged schema.  In this case,
@@ -611,7 +620,8 @@ private[sql] object ParquetRelation2 {
   // internally.
   private[sql] val METASTORE_SCHEMA = "metastoreSchema"
 
-  private[parquet] def readSchema(footers: Seq[Footer], sqlContext: SQLContext): StructType = {
+  private[parquet] def readSchema(
+      footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = {
     footers.map { footer =>
       val metadata = footer.getParquetMetadata.getFileMetaData
       val parquetSchema = metadata.getSchema
@@ -630,7 +640,7 @@ private[sql] object ParquetRelation2 {
             sqlContext.conf.isParquetBinaryAsString,
             sqlContext.conf.isParquetINT96AsTimestamp))
       }
-    }.reduce { (left, right) =>
+    }.reduceOption { (left, right) =>
       try left.merge(right) catch { case e: Throwable =>
         throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/117121a4/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 485d5c9..c30090f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.hive
 
 import java.io.File
+
 import org.scalatest.BeforeAndAfterEach
 
 import org.apache.commons.io.FileUtils
@@ -30,6 +31,8 @@ import org.apache.spark.util.Utils
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
+import org.apache.spark.sql.parquet.ParquetRelation2
+import org.apache.spark.sql.sources.LogicalRelation
 
 /**
  * Tests for persisting tables created though the data sources API into the metastore.
@@ -553,4 +556,39 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
     sql("DROP TABLE savedJsonTable")
     conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, originalDefaultSource)
   }
+
+  if (HiveShim.version == "0.13.1") {
+    test("scan a parquet table created through a CTAS statement") {
+      val originalConvertMetastore = getConf("spark.sql.hive.convertMetastoreParquet", "true")
+      val originalUseDataSource = getConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
+      setConf("spark.sql.hive.convertMetastoreParquet", "true")
+      setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
+
+      val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
+      jsonRDD(rdd).registerTempTable("jt")
+      sql(
+        """
+          |create table test_parquet_ctas STORED AS parquET
+          |AS select tmp.a from jt tmp where tmp.a < 5
+        """.stripMargin)
+
+      checkAnswer(
+        sql(s"SELECT a FROM test_parquet_ctas WHERE a > 2 "),
+        Row(3) :: Row(4) :: Nil
+      )
+
+      table("test_parquet_ctas").queryExecution.analyzed match {
+        case LogicalRelation(p: ParquetRelation2) => // OK
+        case _ =>
+          fail(
+            s"test_parquet_ctas should be converted to ${classOf[ParquetRelation2].getCanonicalName}")
+      }
+
+      // Clenup and reset confs.
+      sql("DROP TABLE IF EXISTS jt")
+      sql("DROP TABLE IF EXISTS test_parquet_ctas")
+      setConf("spark.sql.hive.convertMetastoreParquet", originalConvertMetastore)
+      setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalUseDataSource)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/117121a4/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
index 2acf1a7..653f4b4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
@@ -20,15 +20,15 @@ package org.apache.spark.sql.parquet
 
 import java.io.File
 
-import org.apache.spark.sql.catalyst.expressions.Row
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.sql.{SQLConf, QueryTest}
+import org.apache.spark.sql.catalyst.expressions.Row
 import org.apache.spark.sql.execution.PhysicalRDD
 import org.apache.spark.sql.hive.execution.HiveTableScan
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
-
+import org.apache.spark.sql.sources.LogicalRelation
 
 // The data where the partitioning key exists only in the directory structure.
 case class ParquetData(intField: Int, stringField: String)
@@ -121,13 +121,123 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
 
   override def beforeAll(): Unit = {
     super.beforeAll()
+
+    val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
+    jsonRDD(rdd).registerTempTable("jt")
+
+    sql(
+      """
+        |create table test_parquet
+        |(
+        |  intField INT,
+        |  stringField STRING
+        |)
+        |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+        |STORED AS
+        |  INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+        |  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+      """.stripMargin)
+
     conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
   }
 
   override def afterAll(): Unit = {
     super.afterAll()
+    sql("DROP TABLE IF EXISTS jt")
+    sql("DROP TABLE IF EXISTS test_parquet")
+
     setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
   }
+
+  test("scan an empty parquet table") {
+    checkAnswer(sql("SELECT count(*) FROM test_parquet"), Row(0))
+  }
+
+  test("scan an empty parquet table with upper case") {
+    checkAnswer(sql("SELECT count(INTFIELD) FROM TEST_parquet"), Row(0))
+  }
+
+  test("insert into an empty parquet table") {
+    sql(
+      """
+        |create table test_insert_parquet
+        |(
+        |  intField INT,
+        |  stringField STRING
+        |)
+        |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+        |STORED AS
+        |  INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+        |  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+      """.stripMargin)
+
+    // Insert into am empty table.
+    sql("insert into table test_insert_parquet select a, b from jt where jt.a > 5")
+    checkAnswer(
+      sql(s"SELECT intField, stringField FROM test_insert_parquet WHERE intField < 8"),
+      Row(6, "str6") :: Row(7, "str7") :: Nil
+    )
+    // Insert overwrite.
+    sql("insert overwrite table test_insert_parquet select a, b from jt where jt.a < 5")
+    checkAnswer(
+      sql(s"SELECT intField, stringField FROM test_insert_parquet WHERE intField > 2"),
+      Row(3, "str3") :: Row(4, "str4") :: Nil
+    )
+    sql("DROP TABLE IF EXISTS test_insert_parquet")
+
+    // Create it again.
+    sql(
+      """
+        |create table test_insert_parquet
+        |(
+        |  intField INT,
+        |  stringField STRING
+        |)
+        |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+        |STORED AS
+        |  INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+        |  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+      """.stripMargin)
+    // Insert overwrite an empty table.
+    sql("insert overwrite table test_insert_parquet select a, b from jt where jt.a < 5")
+    checkAnswer(
+      sql(s"SELECT intField, stringField FROM test_insert_parquet WHERE intField > 2"),
+      Row(3, "str3") :: Row(4, "str4") :: Nil
+    )
+    // Insert into the table.
+    sql("insert into table test_insert_parquet select a, b from jt")
+    checkAnswer(
+      sql(s"SELECT intField, stringField FROM test_insert_parquet"),
+      (1 to 10).map(i => Row(i, s"str$i")) ++ (1 to 4).map(i => Row(i, s"str$i"))
+    )
+    sql("DROP TABLE IF EXISTS test_insert_parquet")
+  }
+
+  test("scan a parquet table created through a CTAS statement") {
+    sql(
+      """
+        |create table test_parquet_ctas ROW FORMAT
+        |SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+        |STORED AS
+        |  INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+        |  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+        |AS select * from jt
+      """.stripMargin)
+
+    checkAnswer(
+      sql(s"SELECT a, b FROM test_parquet_ctas WHERE a = 1"),
+      Seq(Row(1, "str1"))
+    )
+
+    table("test_parquet_ctas").queryExecution.analyzed match {
+      case LogicalRelation(p: ParquetRelation2) => // OK
+      case _ =>
+        fail(
+          s"test_parquet_ctas should be converted to ${classOf[ParquetRelation2].getCanonicalName}")
+    }
+
+    sql("DROP TABLE IF EXISTS test_parquet_ctas")
+  }
 }
 
 class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org