You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/09/22 16:44:51 UTC
spark git commit: [SPARK-25465][TEST] Refactor Parquet test suites in
project Hive
Repository: spark
Updated Branches:
refs/heads/master 40edab209 -> 6ca87eb2e
[SPARK-25465][TEST] Refactor Parquet test suites in project Hive
## What changes were proposed in this pull request?
Current the file [parquetSuites.scala](https://github.com/apache/spark/blob/f29c2b5287563c0d6f55f936bd5a75707d7b2b1f/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala) is not recognizable.
When I tried to find test suites for built-in Parquet conversions for Hive serde, I can only find [HiveParquetSuite](https://github.com/apache/spark/blob/f29c2b5287563c0d6f55f936bd5a75707d7b2b1f/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala) in the first few minutes.
This PR is to:
1. Rename `ParquetMetastoreSuite` to `HiveParquetMetastoreSuite`, and create a single file for it.
2. Rename `ParquetSourceSuite` to `HiveParquetSourceSuite`, and create a single file for it.
3. Create a single file for `ParquetPartitioningTest`.
4. Delete `parquetSuites.scala` .
## How was this patch tested?
Unit test
Closes #22467 from gengliangwang/refactor_parquet_suites.
Authored-by: Gengliang Wang <ge...@databricks.com>
Signed-off-by: gatorsmile <ga...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6ca87eb2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6ca87eb2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6ca87eb2
Branch: refs/heads/master
Commit: 6ca87eb2e0c60baa5faec91a12240ac50a248e72
Parents: 40edab2
Author: Gengliang Wang <ge...@databricks.com>
Authored: Sat Sep 22 09:44:46 2018 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Sat Sep 22 09:44:46 2018 -0700
----------------------------------------------------------------------
.../sql/hive/HiveParquetMetastoreSuite.scala | 659 +++++++++++
.../spark/sql/hive/HiveParquetSourceSuite.scala | 225 ++++
.../sql/hive/ParquetPartitioningTest.scala | 252 ++++
.../apache/spark/sql/hive/parquetSuites.scala | 1081 ------------------
4 files changed, 1136 insertions(+), 1081 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/6ca87eb2/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetMetastoreSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetMetastoreSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetMetastoreSuite.scala
new file mode 100644
index 0000000..0d4f040
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetMetastoreSuite.scala
@@ -0,0 +1,659 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.DataSourceScanExec
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoHadoopFsRelationCommand, LogicalRelation}
+import org.apache.spark.sql.hive.execution.HiveTableScanExec
+
+/**
+ * A suite to test the automatic conversion of metastore tables with parquet data to use the
+ * built in parquet support.
+ */
+class HiveParquetMetastoreSuite extends ParquetPartitioningTest {
+ import hiveContext._
+ import spark.implicits._
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ dropTables("partitioned_parquet",
+ "partitioned_parquet_with_key",
+ "partitioned_parquet_with_complextypes",
+ "partitioned_parquet_with_key_and_complextypes",
+ "normal_parquet",
+ "jt",
+ "jt_array",
+ "test_parquet")
+ sql(
+ s"""
+ |create external table partitioned_parquet
+ |(
+ | intField INT,
+ | stringField STRING
+ |)
+ |PARTITIONED BY (p int)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ | STORED AS
+ | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ |location '${partitionedTableDir.toURI}'
+ """.stripMargin)
+
+ sql(
+ s"""
+ |create external table partitioned_parquet_with_key
+ |(
+ | intField INT,
+ | stringField STRING
+ |)
+ |PARTITIONED BY (p int)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ | STORED AS
+ | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ |location '${partitionedTableDirWithKey.toURI}'
+ """.stripMargin)
+
+ sql(
+ s"""
+ |create external table normal_parquet
+ |(
+ | intField INT,
+ | stringField STRING
+ |)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ | STORED AS
+ | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ |location '${new File(normalTableDir, "normal").toURI}'
+ """.stripMargin)
+
+ sql(
+ s"""
+ |CREATE EXTERNAL TABLE partitioned_parquet_with_complextypes
+ |(
+ | intField INT,
+ | stringField STRING,
+ | structField STRUCT<intStructField: INT, stringStructField: STRING>,
+ | arrayField ARRAY<INT>
+ |)
+ |PARTITIONED BY (p int)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ | STORED AS
+ | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ |LOCATION '${partitionedTableDirWithComplexTypes.toURI}'
+ """.stripMargin)
+
+ sql(
+ s"""
+ |CREATE EXTERNAL TABLE partitioned_parquet_with_key_and_complextypes
+ |(
+ | intField INT,
+ | stringField STRING,
+ | structField STRUCT<intStructField: INT, stringStructField: STRING>,
+ | arrayField ARRAY<INT>
+ |)
+ |PARTITIONED BY (p int)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ | STORED AS
+ | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ |LOCATION '${partitionedTableDirWithKeyAndComplexTypes.toURI}'
+ """.stripMargin)
+
+ sql(
+ """
+ |create table test_parquet
+ |(
+ | intField INT,
+ | stringField STRING
+ |)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ |STORED AS
+ | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ """.stripMargin)
+
+ (1 to 10).foreach { p =>
+ sql(s"ALTER TABLE partitioned_parquet ADD PARTITION (p=$p)")
+ }
+
+ (1 to 10).foreach { p =>
+ sql(s"ALTER TABLE partitioned_parquet_with_key ADD PARTITION (p=$p)")
+ }
+
+ (1 to 10).foreach { p =>
+ sql(s"ALTER TABLE partitioned_parquet_with_key_and_complextypes ADD PARTITION (p=$p)")
+ }
+
+ (1 to 10).foreach { p =>
+ sql(s"ALTER TABLE partitioned_parquet_with_complextypes ADD PARTITION (p=$p)")
+ }
+
+ (1 to 10).map(i => (i, s"str$i")).toDF("a", "b").createOrReplaceTempView("jt")
+ (1 to 10).map(i => Tuple1(Seq(new Integer(i), null))).toDF("a")
+ .createOrReplaceTempView("jt_array")
+
+ assert(spark.sqlContext.getConf(HiveUtils.CONVERT_METASTORE_PARQUET.key) == "true")
+ }
+
+ override def afterAll(): Unit = {
+ try {
+ dropTables("partitioned_parquet",
+ "partitioned_parquet_with_key",
+ "partitioned_parquet_with_complextypes",
+ "partitioned_parquet_with_key_and_complextypes",
+ "normal_parquet",
+ "jt",
+ "jt_array",
+ "test_parquet")
+ } finally {
+ super.afterAll()
+ }
+ }
+
+ test(s"conversion is working") {
+ assert(
+ sql("SELECT * FROM normal_parquet").queryExecution.sparkPlan.collect {
+ case _: HiveTableScanExec => true
+ }.isEmpty)
+ assert(
+ sql("SELECT * FROM normal_parquet").queryExecution.sparkPlan.collect {
+ case _: DataSourceScanExec => true
+ }.nonEmpty)
+ }
+
+ test("scan an empty parquet table") {
+ checkAnswer(sql("SELECT count(*) FROM test_parquet"), Row(0))
+ }
+
+ test("scan an empty parquet table with upper case") {
+ checkAnswer(sql("SELECT count(INTFIELD) FROM TEST_parquet"), Row(0))
+ }
+
+ test("insert into an empty parquet table") {
+ dropTables("test_insert_parquet")
+ sql(
+ """
+ |create table test_insert_parquet
+ |(
+ | intField INT,
+ | stringField STRING
+ |)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ |STORED AS
+ | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ """.stripMargin)
+
+ // Insert into am empty table.
+ sql("insert into table test_insert_parquet select a, b from jt where jt.a > 5")
+ checkAnswer(
+ sql(s"SELECT intField, stringField FROM test_insert_parquet WHERE intField < 8"),
+ Row(6, "str6") :: Row(7, "str7") :: Nil
+ )
+ // Insert overwrite.
+ sql("insert overwrite table test_insert_parquet select a, b from jt where jt.a < 5")
+ checkAnswer(
+ sql(s"SELECT intField, stringField FROM test_insert_parquet WHERE intField > 2"),
+ Row(3, "str3") :: Row(4, "str4") :: Nil
+ )
+ dropTables("test_insert_parquet")
+
+ // Create it again.
+ sql(
+ """
+ |create table test_insert_parquet
+ |(
+ | intField INT,
+ | stringField STRING
+ |)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ |STORED AS
+ | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ """.stripMargin)
+ // Insert overwrite an empty table.
+ sql("insert overwrite table test_insert_parquet select a, b from jt where jt.a < 5")
+ checkAnswer(
+ sql(s"SELECT intField, stringField FROM test_insert_parquet WHERE intField > 2"),
+ Row(3, "str3") :: Row(4, "str4") :: Nil
+ )
+ // Insert into the table.
+ sql("insert into table test_insert_parquet select a, b from jt")
+ checkAnswer(
+ sql(s"SELECT intField, stringField FROM test_insert_parquet"),
+ (1 to 10).map(i => Row(i, s"str$i")) ++ (1 to 4).map(i => Row(i, s"str$i"))
+ )
+ dropTables("test_insert_parquet")
+ }
+
+ test("scan a parquet table created through a CTAS statement") {
+ withTable("test_parquet_ctas") {
+ sql(
+ """
+ |create table test_parquet_ctas ROW FORMAT
+ |SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ |STORED AS
+ | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ |AS select * from jt
+ """.stripMargin)
+
+ checkAnswer(
+ sql(s"SELECT a, b FROM test_parquet_ctas WHERE a = 1"),
+ Seq(Row(1, "str1"))
+ )
+
+ table("test_parquet_ctas").queryExecution.optimizedPlan match {
+ case LogicalRelation(_: HadoopFsRelation, _, _, _) => // OK
+ case _ => fail(
+ "test_parquet_ctas should be converted to " +
+ s"${classOf[HadoopFsRelation ].getCanonicalName }")
+ }
+ }
+ }
+
+ test("MetastoreRelation in InsertIntoTable will be converted") {
+ withTable("test_insert_parquet") {
+ sql(
+ """
+ |create table test_insert_parquet
+ |(
+ | intField INT
+ |)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ |STORED AS
+ | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ """.stripMargin)
+
+ val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
+ df.queryExecution.analyzed match {
+ case cmd: InsertIntoHadoopFsRelationCommand =>
+ assert(cmd.catalogTable.map(_.identifier.table) === Some("test_insert_parquet"))
+ case o => fail("test_insert_parquet should be converted to a " +
+ s"${classOf[HadoopFsRelation ].getCanonicalName}. However, found a ${o.toString}")
+ }
+
+ checkAnswer(
+ sql("SELECT intField FROM test_insert_parquet WHERE test_insert_parquet.intField > 5"),
+ sql("SELECT a FROM jt WHERE jt.a > 5").collect()
+ )
+ }
+ }
+
+ test("MetastoreRelation in InsertIntoHiveTable will be converted") {
+ withTable("test_insert_parquet") {
+ sql(
+ """
+ |create table test_insert_parquet
+ |(
+ | int_array array<int>
+ |)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ |STORED AS
+ | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ """.stripMargin)
+
+ val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
+ df.queryExecution.analyzed match {
+ case cmd: InsertIntoHadoopFsRelationCommand =>
+ assert(cmd.catalogTable.map(_.identifier.table) === Some("test_insert_parquet"))
+ case o => fail("test_insert_parquet should be converted to a " +
+ s"${classOf[HadoopFsRelation ].getCanonicalName}. However, found a ${o.toString}")
+ }
+
+ checkAnswer(
+ sql("SELECT int_array FROM test_insert_parquet"),
+ sql("SELECT a FROM jt_array").collect()
+ )
+ }
+ }
+
+ test("SPARK-6450 regression test") {
+ withTable("ms_convert") {
+ sql(
+ """CREATE TABLE IF NOT EXISTS ms_convert (key INT)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ |STORED AS
+ | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ """.stripMargin)
+
+ // This shouldn't throw AnalysisException
+ val analyzed = sql(
+ """SELECT key FROM ms_convert
+ |UNION ALL
+ |SELECT key FROM ms_convert
+ """.stripMargin).queryExecution.analyzed
+
+ assertResult(2) {
+ analyzed.collect {
+ case r @ LogicalRelation(_: HadoopFsRelation, _, _, _) => r
+ }.size
+ }
+ }
+ }
+
+ def collectHadoopFsRelation(df: DataFrame): HadoopFsRelation = {
+ val plan = df.queryExecution.analyzed
+ plan.collectFirst {
+ case LogicalRelation(r: HadoopFsRelation, _, _, _) => r
+ }.getOrElse {
+ fail(s"Expecting a HadoopFsRelation 2, but got:\n$plan")
+ }
+ }
+
+ test("SPARK-7749: non-partitioned metastore Parquet table lookup should use cached relation") {
+ withTable("nonPartitioned") {
+ sql(
+ """
+ |CREATE TABLE nonPartitioned (
+ | key INT,
+ | value STRING
+ |)
+ |STORED AS PARQUET
+ """.stripMargin)
+
+ // First lookup fills the cache
+ val r1 = collectHadoopFsRelation(table("nonPartitioned"))
+ // Second lookup should reuse the cache
+ val r2 = collectHadoopFsRelation(table("nonPartitioned"))
+ // They should be the same instance
+ assert(r1 eq r2)
+ }
+ }
+
+ test("SPARK-7749: partitioned metastore Parquet table lookup should use cached relation") {
+ withTable("partitioned") {
+ sql(
+ """
+ |CREATE TABLE partitioned (
+ | key INT,
+ | value STRING
+ |)
+ |PARTITIONED BY (part INT)
+ |STORED AS PARQUET
+ """.stripMargin)
+
+ // First lookup fills the cache
+ val r1 = collectHadoopFsRelation(table("partitioned"))
+ // Second lookup should reuse the cache
+ val r2 = collectHadoopFsRelation(table("partitioned"))
+ // They should be the same instance
+ assert(r1 eq r2)
+ }
+ }
+
+ test("SPARK-15968: nonempty partitioned metastore Parquet table lookup should use cached " +
+ "relation") {
+ withTable("partitioned") {
+ sql(
+ """
+ |CREATE TABLE partitioned (
+ | key INT,
+ | value STRING
+ |)
+ |PARTITIONED BY (part INT)
+ |STORED AS PARQUET
+ """.stripMargin)
+ sql("INSERT INTO TABLE partitioned PARTITION(part=0) SELECT 1 as key, 'one' as value")
+
+ // First lookup fills the cache
+ val r1 = collectHadoopFsRelation(table("partitioned"))
+ // Second lookup should reuse the cache
+ val r2 = collectHadoopFsRelation(table("partitioned"))
+ // They should be the same instance
+ assert(r1 eq r2)
+ }
+ }
+
+ private def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = {
+ sessionState.catalog.asInstanceOf[HiveSessionCatalog].metastoreCatalog
+ .getCachedDataSourceTable(table)
+ }
+
+ test("Caching converted data source Parquet Relations") {
+ def checkCached(tableIdentifier: TableIdentifier): Unit = {
+ // Converted test_parquet should be cached.
+ getCachedDataSourceTable(tableIdentifier) match {
+ case null => fail("Converted test_parquet should be cached in the cache.")
+ case LogicalRelation(_: HadoopFsRelation, _, _, _) => // OK
+ case other =>
+ fail(
+ "The cached test_parquet should be a Parquet Relation. " +
+ s"However, $other is returned form the cache.")
+ }
+ }
+
+ dropTables("test_insert_parquet", "test_parquet_partitioned_cache_test")
+
+ sql(
+ """
+ |create table test_insert_parquet
+ |(
+ | intField INT,
+ | stringField STRING
+ |)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ |STORED AS
+ | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ """.stripMargin)
+
+ var tableIdentifier = TableIdentifier("test_insert_parquet", Some("default"))
+
+ // First, make sure the converted test_parquet is not cached.
+ assert(getCachedDataSourceTable(tableIdentifier) === null)
+ // Table lookup will make the table cached.
+ table("test_insert_parquet")
+ checkCached(tableIdentifier)
+ // For insert into non-partitioned table, we will do the conversion,
+ // so the converted test_insert_parquet should be cached.
+ sessionState.refreshTable("test_insert_parquet")
+ assert(getCachedDataSourceTable(tableIdentifier) === null)
+ sql(
+ """
+ |INSERT INTO TABLE test_insert_parquet
+ |select a, b from jt
+ """.stripMargin)
+ checkCached(tableIdentifier)
+ // Make sure we can read the data.
+ checkAnswer(
+ sql("select * from test_insert_parquet"),
+ sql("select a, b from jt").collect())
+ // Invalidate the cache.
+ sessionState.refreshTable("test_insert_parquet")
+ assert(getCachedDataSourceTable(tableIdentifier) === null)
+
+ // Create a partitioned table.
+ sql(
+ """
+ |create table test_parquet_partitioned_cache_test
+ |(
+ | intField INT,
+ | stringField STRING
+ |)
+ |PARTITIONED BY (`date` string)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ |STORED AS
+ | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ """.stripMargin)
+
+ tableIdentifier = TableIdentifier("test_parquet_partitioned_cache_test", Some("default"))
+ assert(getCachedDataSourceTable(tableIdentifier) === null)
+ sql(
+ """
+ |INSERT INTO TABLE test_parquet_partitioned_cache_test
+ |PARTITION (`date`='2015-04-01')
+ |select a, b from jt
+ """.stripMargin)
+ // Right now, insert into a partitioned Parquet is not supported in data source Parquet.
+ // So, we expect it is not cached.
+ assert(getCachedDataSourceTable(tableIdentifier) === null)
+ sql(
+ """
+ |INSERT INTO TABLE test_parquet_partitioned_cache_test
+ |PARTITION (`date`='2015-04-02')
+ |select a, b from jt
+ """.stripMargin)
+ assert(getCachedDataSourceTable(tableIdentifier) === null)
+
+ // Make sure we can cache the partitioned table.
+ table("test_parquet_partitioned_cache_test")
+ checkCached(tableIdentifier)
+ // Make sure we can read the data.
+ checkAnswer(
+ sql("select STRINGField, `date`, intField from test_parquet_partitioned_cache_test"),
+ sql(
+ """
+ |select b, '2015-04-01', a FROM jt
+ |UNION ALL
+ |select b, '2015-04-02', a FROM jt
+ """.stripMargin).collect())
+
+ sessionState.refreshTable("test_parquet_partitioned_cache_test")
+ assert(getCachedDataSourceTable(tableIdentifier) === null)
+
+ dropTables("test_insert_parquet", "test_parquet_partitioned_cache_test")
+ }
+
+ test("SPARK-15248: explicitly added partitions should be readable") {
+ withTable("test_added_partitions", "test_temp") {
+ withTempDir { src =>
+ val partitionDir = new File(src, "partition").toURI
+ sql(
+ """
+ |CREATE TABLE test_added_partitions (a STRING)
+ |PARTITIONED BY (b INT)
+ |STORED AS PARQUET
+ """.stripMargin)
+
+ // Temp view that is used to insert data into partitioned table
+ Seq("foo", "bar").toDF("a").createOrReplaceTempView("test_temp")
+ sql("INSERT INTO test_added_partitions PARTITION(b='0') SELECT a FROM test_temp")
+
+ checkAnswer(
+ sql("SELECT * FROM test_added_partitions"),
+ Seq(Row("foo", 0), Row("bar", 0)))
+
+ // Create partition without data files and check whether it can be read
+ sql(s"ALTER TABLE test_added_partitions ADD PARTITION (b='1') LOCATION '$partitionDir'")
+ checkAnswer(
+ sql("SELECT * FROM test_added_partitions"),
+ Seq(Row("foo", 0), Row("bar", 0)))
+
+ // Add data files to partition directory and check whether they can be read
+ sql("INSERT INTO TABLE test_added_partitions PARTITION (b=1) select 'baz' as a")
+ checkAnswer(
+ sql("SELECT * FROM test_added_partitions"),
+ Seq(Row("foo", 0), Row("bar", 0), Row("baz", 1)))
+
+ // Check it with pruning predicates
+ checkAnswer(
+ sql("SELECT * FROM test_added_partitions where b = 0"),
+ Seq(Row("foo", 0), Row("bar", 0)))
+ checkAnswer(
+ sql("SELECT * FROM test_added_partitions where b = 1"),
+ Seq(Row("baz", 1)))
+ checkAnswer(
+ sql("SELECT * FROM test_added_partitions where b = 2"),
+ Seq.empty)
+
+ // Also verify the inputFiles implementation
+ assert(sql("select * from test_added_partitions").inputFiles.length == 2)
+ assert(sql("select * from test_added_partitions where b = 0").inputFiles.length == 1)
+ assert(sql("select * from test_added_partitions where b = 1").inputFiles.length == 1)
+ assert(sql("select * from test_added_partitions where b = 2").inputFiles.length == 0)
+ }
+ }
+ }
+
+ test("Explicitly added partitions should be readable after load") {
+ withTable("test_added_partitions") {
+ withTempDir { src =>
+ val newPartitionDir = src.toURI.toString
+ spark.range(2).selectExpr("cast(id as string)").toDF("a").write
+ .mode("overwrite")
+ .parquet(newPartitionDir)
+
+ sql(
+ """
+ |CREATE TABLE test_added_partitions (a STRING)
+ |PARTITIONED BY (b INT)
+ |STORED AS PARQUET
+ """.stripMargin)
+
+ // Create partition without data files and check whether it can be read
+ sql(s"ALTER TABLE test_added_partitions ADD PARTITION (b='1')")
+ // This table fetch is to fill the cache with zero leaf files
+ checkAnswer(spark.table("test_added_partitions"), Seq.empty)
+
+ sql(
+ s"""
+ |LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE
+ |INTO TABLE test_added_partitions PARTITION(b='1')
+ """.stripMargin)
+
+ checkAnswer(
+ spark.table("test_added_partitions"),
+ Seq(Row("0", 1), Row("1", 1)))
+ }
+ }
+ }
+
+ test("Non-partitioned table readable after load") {
+ withTable("tab") {
+ withTempDir { src =>
+ val newPartitionDir = src.toURI.toString
+ spark.range(2).selectExpr("cast(id as string)").toDF("a").write
+ .mode("overwrite")
+ .parquet(newPartitionDir)
+
+ sql("CREATE TABLE tab (a STRING) STORED AS PARQUET")
+
+ // This table fetch is to fill the cache with zero leaf files
+ checkAnswer(spark.table("tab"), Seq.empty)
+
+ sql(
+ s"""
+ |LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE
+ |INTO TABLE tab
+ """.stripMargin)
+
+ checkAnswer(spark.table("tab"), Seq(Row("0"), Row("1")))
+ }
+ }
+ }
+
+ test("self-join") {
+ val table = spark.table("normal_parquet")
+ val selfJoin = table.as("t1").crossJoin(table.as("t2"))
+ checkAnswer(selfJoin,
+ sql("SELECT * FROM normal_parquet x CROSS JOIN normal_parquet y"))
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/6ca87eb2/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSourceSuite.scala
new file mode 100644
index 0000000..de58876
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSourceSuite.scala
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import org.apache.spark.sql.{Row, SaveMode}
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
+
+/**
+ * A suite of tests for the Parquet support through the data sources API.
+ */
+class HiveParquetSourceSuite extends ParquetPartitioningTest {
+ import testImplicits._
+ import spark._
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ dropTables("partitioned_parquet",
+ "partitioned_parquet_with_key",
+ "partitioned_parquet_with_complextypes",
+ "partitioned_parquet_with_key_and_complextypes",
+ "normal_parquet")
+
+ sql(
+ s"""
+ |CREATE TEMPORARY VIEW partitioned_parquet
+ |USING org.apache.spark.sql.parquet
+ |OPTIONS (
+ | path '${partitionedTableDir.toURI}'
+ |)
+ """.stripMargin)
+
+ sql(
+ s"""
+ |CREATE TEMPORARY VIEW partitioned_parquet_with_key
+ |USING org.apache.spark.sql.parquet
+ |OPTIONS (
+ | path '${partitionedTableDirWithKey.toURI}'
+ |)
+ """.stripMargin)
+
+ sql(
+ s"""
+ |CREATE TEMPORARY VIEW normal_parquet
+ |USING org.apache.spark.sql.parquet
+ |OPTIONS (
+ | path '${new File(partitionedTableDir, "p=1").toURI}'
+ |)
+ """.stripMargin)
+
+ sql(
+ s"""
+ |CREATE TEMPORARY VIEW partitioned_parquet_with_key_and_complextypes
+ |USING org.apache.spark.sql.parquet
+ |OPTIONS (
+ | path '${partitionedTableDirWithKeyAndComplexTypes.toURI}'
+ |)
+ """.stripMargin)
+
+ sql(
+ s"""
+ |CREATE TEMPORARY VIEW partitioned_parquet_with_complextypes
+ |USING org.apache.spark.sql.parquet
+ |OPTIONS (
+ | path '${partitionedTableDirWithComplexTypes.toURI}'
+ |)
+ """.stripMargin)
+ }
+
+ test("SPARK-6016 make sure to use the latest footers") {
+ val tableName = "spark_6016_fix"
+ withTable(tableName) {
+ // Create a DataFrame with two partitions. So, the created table will have two parquet files.
+ val df1 = (1 to 10).map(Tuple1(_)).toDF("a").coalesce(2)
+ df1.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable(tableName)
+ checkAnswer(
+ sql(s"select * from $tableName"),
+ (1 to 10).map(i => Row(i))
+ )
+
+ // Create a DataFrame with four partitions. So the created table will have four parquet files.
+ val df2 = (1 to 10).map(Tuple1(_)).toDF("b").coalesce(4)
+ df2.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable(tableName)
+ // For the bug of SPARK-6016, we are caching two outdated footers for df1. Then,
+ // since the new table has four parquet files, we are trying to read new footers from two
+ // files and then merge metadata in footers of these four
+ // (two outdated ones and two latest one), which will cause an error.
+ checkAnswer(
+ sql(s"select * from $tableName"),
+ (1 to 10).map(i => Row(i))
+ )
+ }
+ }
+
+ test("SPARK-8811: compatibility with array of struct in Hive") {
+ withTempPath { dir =>
+ withTable("array_of_struct") {
+ val conf = Seq(
+ HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false",
+ SQLConf.PARQUET_BINARY_AS_STRING.key -> "true",
+ SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "false")
+
+ withSQLConf(conf: _*) {
+ sql(
+ s"""CREATE TABLE array_of_struct
+ |STORED AS PARQUET LOCATION '${dir.toURI}'
+ |AS SELECT
+ | '1st' AS a,
+ | '2nd' AS b,
+ | ARRAY(NAMED_STRUCT('a', 'val_a', 'b', 'val_b')) AS c
+ """.stripMargin)
+
+ checkAnswer(
+ spark.read.parquet(dir.getCanonicalPath),
+ Row("1st", "2nd", Seq(Row("val_a", "val_b"))))
+ }
+ }
+ }
+ }
+
+ test("Verify the PARQUET conversion parameter: CONVERT_METASTORE_PARQUET") {
+ withTempView("single") {
+ val singleRowDF = Seq((0, "foo")).toDF("key", "value")
+ singleRowDF.createOrReplaceTempView("single")
+
+ Seq("true", "false").foreach { parquetConversion =>
+ withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> parquetConversion) {
+ val tableName = "test_parquet_ctas"
+ withTable(tableName) {
+ sql(
+ s"""
+ |CREATE TABLE $tableName STORED AS PARQUET
+ |AS SELECT tmp.key, tmp.value FROM single tmp
+ """.stripMargin)
+
+ val df = spark.sql(s"SELECT * FROM $tableName WHERE key=0")
+ checkAnswer(df, singleRowDF)
+
+ val queryExecution = df.queryExecution
+ if (parquetConversion == "true") {
+ queryExecution.analyzed.collectFirst {
+ case _: LogicalRelation =>
+ }.getOrElse {
+ fail(s"Expecting the query plan to convert parquet to data sources, " +
+ s"but got:\n$queryExecution")
+ }
+ } else {
+ queryExecution.analyzed.collectFirst {
+ case _: HiveTableRelation =>
+ }.getOrElse {
+ fail(s"Expecting no conversion from parquet to data sources, " +
+ s"but got:\n$queryExecution")
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ test("values in arrays and maps stored in parquet are always nullable") {
+ val df = createDataFrame(Tuple2(Map(2 -> 3), Seq(4, 5, 6)) :: Nil).toDF("m", "a")
+ val mapType1 = MapType(IntegerType, IntegerType, valueContainsNull = false)
+ val arrayType1 = ArrayType(IntegerType, containsNull = false)
+ val expectedSchema1 =
+ StructType(
+ StructField("m", mapType1, nullable = true) ::
+ StructField("a", arrayType1, nullable = true) :: Nil)
+ assert(df.schema === expectedSchema1)
+
+ withTable("alwaysNullable") {
+ df.write.format("parquet").saveAsTable("alwaysNullable")
+
+ val mapType2 = MapType(IntegerType, IntegerType, valueContainsNull = true)
+ val arrayType2 = ArrayType(IntegerType, containsNull = true)
+ val expectedSchema2 =
+ StructType(
+ StructField("m", mapType2, nullable = true) ::
+ StructField("a", arrayType2, nullable = true) :: Nil)
+
+ assert(table("alwaysNullable").schema === expectedSchema2)
+
+ checkAnswer(
+ sql("SELECT m, a FROM alwaysNullable"),
+ Row(Map(2 -> 3), Seq(4, 5, 6)))
+ }
+ }
+
+ test("Aggregation attribute names can't contain special chars \" ,;{}()\\n\\t=\"") {
+ withTempDir { tempDir =>
+ val filePath = new File(tempDir, "testParquet").getCanonicalPath
+ val filePath2 = new File(tempDir, "testParquet2").getCanonicalPath
+
+ val df = Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str")
+ val df2 = df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("y.str").max("y.int")
+ intercept[Throwable](df2.write.parquet(filePath))
+
+ val df3 = df2.toDF("str", "max_int")
+ df3.write.parquet(filePath2)
+ val df4 = read.parquet(filePath2)
+ checkAnswer(df4, Row("1", 1) :: Row("2", 2) :: Row("3", 3) :: Nil)
+ assert(df4.columns === Array("str", "max_int"))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/6ca87eb2/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetPartitioningTest.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetPartitioningTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetPartitioningTest.scala
new file mode 100644
index 0000000..2ae3cf4
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetPartitioningTest.scala
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.util.Utils
+
+// The data where the partitioning key exists only in the directory structure.
+case class ParquetData(intField: Int, stringField: String)
+// The data that also includes the partitioning key
+case class ParquetDataWithKey(p: Int, intField: Int, stringField: String)
+
+case class StructContainer(intStructField: Int, stringStructField: String)
+
+case class ParquetDataWithComplexTypes(
+ intField: Int,
+ stringField: String,
+ structField: StructContainer,
+ arrayField: Seq[Int])
+
+case class ParquetDataWithKeyAndComplexTypes(
+ p: Int,
+ intField: Int,
+ stringField: String,
+ structField: StructContainer,
+ arrayField: Seq[Int])
+
+/**
+ * A collection of tests for parquet data with various forms of partitioning.
+ */
+abstract class ParquetPartitioningTest extends QueryTest with SQLTestUtils with TestHiveSingleton {
+ import testImplicits._
+
+ var partitionedTableDir: File = null
+ var normalTableDir: File = null
+ var partitionedTableDirWithKey: File = null
+ var partitionedTableDirWithComplexTypes: File = null
+ var partitionedTableDirWithKeyAndComplexTypes: File = null
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ partitionedTableDir = Utils.createTempDir()
+ normalTableDir = Utils.createTempDir()
+
+ (1 to 10).foreach { p =>
+ val partDir = new File(partitionedTableDir, s"p=$p")
+ sparkContext.makeRDD(1 to 10)
+ .map(i => ParquetData(i, s"part-$p"))
+ .toDF()
+ .write.parquet(partDir.getCanonicalPath)
+ }
+
+ sparkContext
+ .makeRDD(1 to 10)
+ .map(i => ParquetData(i, s"part-1"))
+ .toDF()
+ .write.parquet(new File(normalTableDir, "normal").getCanonicalPath)
+
+ partitionedTableDirWithKey = Utils.createTempDir()
+
+ (1 to 10).foreach { p =>
+ val partDir = new File(partitionedTableDirWithKey, s"p=$p")
+ sparkContext.makeRDD(1 to 10)
+ .map(i => ParquetDataWithKey(p, i, s"part-$p"))
+ .toDF()
+ .write.parquet(partDir.getCanonicalPath)
+ }
+
+ partitionedTableDirWithKeyAndComplexTypes = Utils.createTempDir()
+
+ (1 to 10).foreach { p =>
+ val partDir = new File(partitionedTableDirWithKeyAndComplexTypes, s"p=$p")
+ sparkContext.makeRDD(1 to 10).map { i =>
+ ParquetDataWithKeyAndComplexTypes(
+ p, i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i)
+ }.toDF().write.parquet(partDir.getCanonicalPath)
+ }
+
+ partitionedTableDirWithComplexTypes = Utils.createTempDir()
+
+ (1 to 10).foreach { p =>
+ val partDir = new File(partitionedTableDirWithComplexTypes, s"p=$p")
+ sparkContext.makeRDD(1 to 10).map { i =>
+ ParquetDataWithComplexTypes(i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i)
+ }.toDF().write.parquet(partDir.getCanonicalPath)
+ }
+ }
+
+ override protected def afterAll(): Unit = {
+ try {
+ partitionedTableDir.delete()
+ normalTableDir.delete()
+ partitionedTableDirWithKey.delete()
+ partitionedTableDirWithComplexTypes.delete()
+ partitionedTableDirWithKeyAndComplexTypes.delete()
+ } finally {
+ super.afterAll()
+ }
+ }
+
+ /**
+ * Drop named tables if they exist
+ *
+ * @param tableNames tables to drop
+ */
+ def dropTables(tableNames: String*): Unit = {
+ tableNames.foreach { name =>
+ sql(s"DROP TABLE IF EXISTS $name")
+ }
+ }
+
+ Seq(
+ "partitioned_parquet",
+ "partitioned_parquet_with_key",
+ "partitioned_parquet_with_complextypes",
+ "partitioned_parquet_with_key_and_complextypes").foreach { table =>
+
+ test(s"ordering of the partitioning columns $table") {
+ checkAnswer(
+ sql(s"SELECT p, stringField FROM $table WHERE p = 1"),
+ Seq.fill(10)(Row(1, "part-1"))
+ )
+
+ checkAnswer(
+ sql(s"SELECT stringField, p FROM $table WHERE p = 1"),
+ Seq.fill(10)(Row("part-1", 1))
+ )
+ }
+
+ test(s"project the partitioning column $table") {
+ checkAnswer(
+ sql(s"SELECT p, count(*) FROM $table group by p"),
+ Row(1, 10) ::
+ Row(2, 10) ::
+ Row(3, 10) ::
+ Row(4, 10) ::
+ Row(5, 10) ::
+ Row(6, 10) ::
+ Row(7, 10) ::
+ Row(8, 10) ::
+ Row(9, 10) ::
+ Row(10, 10) :: Nil
+ )
+ }
+
+ test(s"project partitioning and non-partitioning columns $table") {
+ checkAnswer(
+ sql(s"SELECT stringField, p, count(intField) FROM $table GROUP BY p, stringField"),
+ Row("part-1", 1, 10) ::
+ Row("part-2", 2, 10) ::
+ Row("part-3", 3, 10) ::
+ Row("part-4", 4, 10) ::
+ Row("part-5", 5, 10) ::
+ Row("part-6", 6, 10) ::
+ Row("part-7", 7, 10) ::
+ Row("part-8", 8, 10) ::
+ Row("part-9", 9, 10) ::
+ Row("part-10", 10, 10) :: Nil
+ )
+ }
+
+ test(s"simple count $table") {
+ checkAnswer(
+ sql(s"SELECT COUNT(*) FROM $table"),
+ Row(100))
+ }
+
+ test(s"pruned count $table") {
+ checkAnswer(
+ sql(s"SELECT COUNT(*) FROM $table WHERE p = 1"),
+ Row(10))
+ }
+
+ test(s"non-existent partition $table") {
+ checkAnswer(
+ sql(s"SELECT COUNT(*) FROM $table WHERE p = 1000"),
+ Row(0))
+ }
+
+ test(s"multi-partition pruned count $table") {
+ checkAnswer(
+ sql(s"SELECT COUNT(*) FROM $table WHERE p IN (1,2,3)"),
+ Row(30))
+ }
+
+ test(s"non-partition predicates $table") {
+ checkAnswer(
+ sql(s"SELECT COUNT(*) FROM $table WHERE intField IN (1,2,3)"),
+ Row(30))
+ }
+
+ test(s"sum $table") {
+ checkAnswer(
+ sql(s"SELECT SUM(intField) FROM $table WHERE intField IN (1,2,3) AND p = 1"),
+ Row(1 + 2 + 3))
+ }
+
+ test(s"hive udfs $table") {
+ checkAnswer(
+ sql(s"SELECT concat(stringField, stringField) FROM $table"),
+ sql(s"SELECT stringField FROM $table").rdd.map {
+ case Row(s: String) => Row(s + s)
+ }.collect().toSeq)
+ }
+ }
+
+ Seq(
+ "partitioned_parquet_with_key_and_complextypes",
+ "partitioned_parquet_with_complextypes").foreach { table =>
+
+ test(s"SPARK-5775 read struct from $table") {
+ checkAnswer(
+ sql(
+ s"""
+ |SELECT p, structField.intStructField, structField.stringStructField
+ |FROM $table WHERE p = 1
+ """.stripMargin),
+ (1 to 10).map(i => Row(1, i, f"${i}_string")))
+ }
+
+ test(s"SPARK-5775 read array from $table") {
+ checkAnswer(
+ sql(s"SELECT arrayField, p FROM $table WHERE p = 1"),
+ (1 to 10).map(i => Row((1 to i).toArray, 1)))
+ }
+ }
+
+ test("non-part select(*)") {
+ checkAnswer(
+ sql("SELECT COUNT(*) FROM normal_parquet"),
+ Row(10))
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/6ca87eb2/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
deleted file mode 100644
index e82d457..0000000
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ /dev/null
@@ -1,1081 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import java.io.File
-
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.DataSourceScanExec
-import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.hive.execution.HiveTableScanExec
-import org.apache.spark.sql.hive.test.TestHiveSingleton
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.test.SQLTestUtils
-import org.apache.spark.sql.types._
-import org.apache.spark.util.Utils
-
-// The data where the partitioning key exists only in the directory structure.
-case class ParquetData(intField: Int, stringField: String)
-// The data that also includes the partitioning key
-case class ParquetDataWithKey(p: Int, intField: Int, stringField: String)
-
-case class StructContainer(intStructField: Int, stringStructField: String)
-
-case class ParquetDataWithComplexTypes(
- intField: Int,
- stringField: String,
- structField: StructContainer,
- arrayField: Seq[Int])
-
-case class ParquetDataWithKeyAndComplexTypes(
- p: Int,
- intField: Int,
- stringField: String,
- structField: StructContainer,
- arrayField: Seq[Int])
-
-/**
- * A suite to test the automatic conversion of metastore tables with parquet data to use the
- * built in parquet support.
- */
-class ParquetMetastoreSuite extends ParquetPartitioningTest {
- import hiveContext._
- import spark.implicits._
-
- override def beforeAll(): Unit = {
- super.beforeAll()
- dropTables("partitioned_parquet",
- "partitioned_parquet_with_key",
- "partitioned_parquet_with_complextypes",
- "partitioned_parquet_with_key_and_complextypes",
- "normal_parquet",
- "jt",
- "jt_array",
- "test_parquet")
- sql(s"""
- create external table partitioned_parquet
- (
- intField INT,
- stringField STRING
- )
- PARTITIONED BY (p int)
- ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
- STORED AS
- INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
- OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
- location '${partitionedTableDir.toURI}'
- """)
-
- sql(s"""
- create external table partitioned_parquet_with_key
- (
- intField INT,
- stringField STRING
- )
- PARTITIONED BY (p int)
- ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
- STORED AS
- INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
- OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
- location '${partitionedTableDirWithKey.toURI}'
- """)
-
- sql(s"""
- create external table normal_parquet
- (
- intField INT,
- stringField STRING
- )
- ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
- STORED AS
- INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
- OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
- location '${new File(normalTableDir, "normal").toURI}'
- """)
-
- sql(s"""
- CREATE EXTERNAL TABLE partitioned_parquet_with_complextypes
- (
- intField INT,
- stringField STRING,
- structField STRUCT<intStructField: INT, stringStructField: STRING>,
- arrayField ARRAY<INT>
- )
- PARTITIONED BY (p int)
- ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
- STORED AS
- INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
- OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
- LOCATION '${partitionedTableDirWithComplexTypes.toURI}'
- """)
-
- sql(s"""
- CREATE EXTERNAL TABLE partitioned_parquet_with_key_and_complextypes
- (
- intField INT,
- stringField STRING,
- structField STRUCT<intStructField: INT, stringStructField: STRING>,
- arrayField ARRAY<INT>
- )
- PARTITIONED BY (p int)
- ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
- STORED AS
- INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
- OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
- LOCATION '${partitionedTableDirWithKeyAndComplexTypes.toURI}'
- """)
-
- sql(
- """
- |create table test_parquet
- |(
- | intField INT,
- | stringField STRING
- |)
- |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
- |STORED AS
- | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
- | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
- """.stripMargin)
-
- (1 to 10).foreach { p =>
- sql(s"ALTER TABLE partitioned_parquet ADD PARTITION (p=$p)")
- }
-
- (1 to 10).foreach { p =>
- sql(s"ALTER TABLE partitioned_parquet_with_key ADD PARTITION (p=$p)")
- }
-
- (1 to 10).foreach { p =>
- sql(s"ALTER TABLE partitioned_parquet_with_key_and_complextypes ADD PARTITION (p=$p)")
- }
-
- (1 to 10).foreach { p =>
- sql(s"ALTER TABLE partitioned_parquet_with_complextypes ADD PARTITION (p=$p)")
- }
-
- (1 to 10).map(i => (i, s"str$i")).toDF("a", "b").createOrReplaceTempView("jt")
- (1 to 10).map(i => Tuple1(Seq(new Integer(i), null))).toDF("a")
- .createOrReplaceTempView("jt_array")
-
- assert(spark.sqlContext.getConf(HiveUtils.CONVERT_METASTORE_PARQUET.key) == "true")
- }
-
- override def afterAll(): Unit = {
- try {
- dropTables("partitioned_parquet",
- "partitioned_parquet_with_key",
- "partitioned_parquet_with_complextypes",
- "partitioned_parquet_with_key_and_complextypes",
- "normal_parquet",
- "jt",
- "jt_array",
- "test_parquet")
- } finally {
- super.afterAll()
- }
- }
-
- test(s"conversion is working") {
- assert(
- sql("SELECT * FROM normal_parquet").queryExecution.sparkPlan.collect {
- case _: HiveTableScanExec => true
- }.isEmpty)
- assert(
- sql("SELECT * FROM normal_parquet").queryExecution.sparkPlan.collect {
- case _: DataSourceScanExec => true
- }.nonEmpty)
- }
-
- test("scan an empty parquet table") {
- checkAnswer(sql("SELECT count(*) FROM test_parquet"), Row(0))
- }
-
- test("scan an empty parquet table with upper case") {
- checkAnswer(sql("SELECT count(INTFIELD) FROM TEST_parquet"), Row(0))
- }
-
- test("insert into an empty parquet table") {
- dropTables("test_insert_parquet")
- sql(
- """
- |create table test_insert_parquet
- |(
- | intField INT,
- | stringField STRING
- |)
- |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
- |STORED AS
- | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
- | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
- """.stripMargin)
-
- // Insert into am empty table.
- sql("insert into table test_insert_parquet select a, b from jt where jt.a > 5")
- checkAnswer(
- sql(s"SELECT intField, stringField FROM test_insert_parquet WHERE intField < 8"),
- Row(6, "str6") :: Row(7, "str7") :: Nil
- )
- // Insert overwrite.
- sql("insert overwrite table test_insert_parquet select a, b from jt where jt.a < 5")
- checkAnswer(
- sql(s"SELECT intField, stringField FROM test_insert_parquet WHERE intField > 2"),
- Row(3, "str3") :: Row(4, "str4") :: Nil
- )
- dropTables("test_insert_parquet")
-
- // Create it again.
- sql(
- """
- |create table test_insert_parquet
- |(
- | intField INT,
- | stringField STRING
- |)
- |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
- |STORED AS
- | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
- | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
- """.stripMargin)
- // Insert overwrite an empty table.
- sql("insert overwrite table test_insert_parquet select a, b from jt where jt.a < 5")
- checkAnswer(
- sql(s"SELECT intField, stringField FROM test_insert_parquet WHERE intField > 2"),
- Row(3, "str3") :: Row(4, "str4") :: Nil
- )
- // Insert into the table.
- sql("insert into table test_insert_parquet select a, b from jt")
- checkAnswer(
- sql(s"SELECT intField, stringField FROM test_insert_parquet"),
- (1 to 10).map(i => Row(i, s"str$i")) ++ (1 to 4).map(i => Row(i, s"str$i"))
- )
- dropTables("test_insert_parquet")
- }
-
- test("scan a parquet table created through a CTAS statement") {
- withTable("test_parquet_ctas") {
- sql(
- """
- |create table test_parquet_ctas ROW FORMAT
- |SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
- |STORED AS
- | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
- | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
- |AS select * from jt
- """.stripMargin)
-
- checkAnswer(
- sql(s"SELECT a, b FROM test_parquet_ctas WHERE a = 1"),
- Seq(Row(1, "str1"))
- )
-
- table("test_parquet_ctas").queryExecution.optimizedPlan match {
- case LogicalRelation(_: HadoopFsRelation, _, _, _) => // OK
- case _ => fail(
- "test_parquet_ctas should be converted to " +
- s"${classOf[HadoopFsRelation ].getCanonicalName }")
- }
- }
- }
-
- test("MetastoreRelation in InsertIntoTable will be converted") {
- withTable("test_insert_parquet") {
- sql(
- """
- |create table test_insert_parquet
- |(
- | intField INT
- |)
- |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
- |STORED AS
- | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
- | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
- """.stripMargin)
-
- val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
- df.queryExecution.analyzed match {
- case cmd: InsertIntoHadoopFsRelationCommand =>
- assert(cmd.catalogTable.map(_.identifier.table) === Some("test_insert_parquet"))
- case o => fail("test_insert_parquet should be converted to a " +
- s"${classOf[HadoopFsRelation ].getCanonicalName}. However, found a ${o.toString}")
- }
-
- checkAnswer(
- sql("SELECT intField FROM test_insert_parquet WHERE test_insert_parquet.intField > 5"),
- sql("SELECT a FROM jt WHERE jt.a > 5").collect()
- )
- }
- }
-
- test("MetastoreRelation in InsertIntoHiveTable will be converted") {
- withTable("test_insert_parquet") {
- sql(
- """
- |create table test_insert_parquet
- |(
- | int_array array<int>
- |)
- |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
- |STORED AS
- | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
- | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
- """.stripMargin)
-
- val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
- df.queryExecution.analyzed match {
- case cmd: InsertIntoHadoopFsRelationCommand =>
- assert(cmd.catalogTable.map(_.identifier.table) === Some("test_insert_parquet"))
- case o => fail("test_insert_parquet should be converted to a " +
- s"${classOf[HadoopFsRelation ].getCanonicalName}. However, found a ${o.toString}")
- }
-
- checkAnswer(
- sql("SELECT int_array FROM test_insert_parquet"),
- sql("SELECT a FROM jt_array").collect()
- )
- }
- }
-
- test("SPARK-6450 regression test") {
- withTable("ms_convert") {
- sql(
- """CREATE TABLE IF NOT EXISTS ms_convert (key INT)
- |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
- |STORED AS
- | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
- | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
- """.stripMargin)
-
- // This shouldn't throw AnalysisException
- val analyzed = sql(
- """SELECT key FROM ms_convert
- |UNION ALL
- |SELECT key FROM ms_convert
- """.stripMargin).queryExecution.analyzed
-
- assertResult(2) {
- analyzed.collect {
- case r @ LogicalRelation(_: HadoopFsRelation, _, _, _) => r
- }.size
- }
- }
- }
-
- def collectHadoopFsRelation(df: DataFrame): HadoopFsRelation = {
- val plan = df.queryExecution.analyzed
- plan.collectFirst {
- case LogicalRelation(r: HadoopFsRelation, _, _, _) => r
- }.getOrElse {
- fail(s"Expecting a HadoopFsRelation 2, but got:\n$plan")
- }
- }
-
- test("SPARK-7749: non-partitioned metastore Parquet table lookup should use cached relation") {
- withTable("nonPartitioned") {
- sql(
- """
- |CREATE TABLE nonPartitioned (
- | key INT,
- | value STRING
- |)
- |STORED AS PARQUET
- """.stripMargin)
-
- // First lookup fills the cache
- val r1 = collectHadoopFsRelation(table("nonPartitioned"))
- // Second lookup should reuse the cache
- val r2 = collectHadoopFsRelation(table("nonPartitioned"))
- // They should be the same instance
- assert(r1 eq r2)
- }
- }
-
- test("SPARK-7749: partitioned metastore Parquet table lookup should use cached relation") {
- withTable("partitioned") {
- sql(
- """
- |CREATE TABLE partitioned (
- | key INT,
- | value STRING
- |)
- |PARTITIONED BY (part INT)
- |STORED AS PARQUET
- """.stripMargin)
-
- // First lookup fills the cache
- val r1 = collectHadoopFsRelation(table("partitioned"))
- // Second lookup should reuse the cache
- val r2 = collectHadoopFsRelation(table("partitioned"))
- // They should be the same instance
- assert(r1 eq r2)
- }
- }
-
- test("SPARK-15968: nonempty partitioned metastore Parquet table lookup should use cached " +
- "relation") {
- withTable("partitioned") {
- sql(
- """
- |CREATE TABLE partitioned (
- | key INT,
- | value STRING
- |)
- |PARTITIONED BY (part INT)
- |STORED AS PARQUET
- """.stripMargin)
- sql("INSERT INTO TABLE partitioned PARTITION(part=0) SELECT 1 as key, 'one' as value")
-
- // First lookup fills the cache
- val r1 = collectHadoopFsRelation(table("partitioned"))
- // Second lookup should reuse the cache
- val r2 = collectHadoopFsRelation(table("partitioned"))
- // They should be the same instance
- assert(r1 eq r2)
- }
- }
-
- private def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = {
- sessionState.catalog.asInstanceOf[HiveSessionCatalog].metastoreCatalog
- .getCachedDataSourceTable(table)
- }
-
- test("Caching converted data source Parquet Relations") {
- def checkCached(tableIdentifier: TableIdentifier): Unit = {
- // Converted test_parquet should be cached.
- getCachedDataSourceTable(tableIdentifier) match {
- case null => fail("Converted test_parquet should be cached in the cache.")
- case LogicalRelation(_: HadoopFsRelation, _, _, _) => // OK
- case other =>
- fail(
- "The cached test_parquet should be a Parquet Relation. " +
- s"However, $other is returned form the cache.")
- }
- }
-
- dropTables("test_insert_parquet", "test_parquet_partitioned_cache_test")
-
- sql(
- """
- |create table test_insert_parquet
- |(
- | intField INT,
- | stringField STRING
- |)
- |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
- |STORED AS
- | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
- | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
- """.stripMargin)
-
- var tableIdentifier = TableIdentifier("test_insert_parquet", Some("default"))
-
- // First, make sure the converted test_parquet is not cached.
- assert(getCachedDataSourceTable(tableIdentifier) === null)
- // Table lookup will make the table cached.
- table("test_insert_parquet")
- checkCached(tableIdentifier)
- // For insert into non-partitioned table, we will do the conversion,
- // so the converted test_insert_parquet should be cached.
- sessionState.refreshTable("test_insert_parquet")
- assert(getCachedDataSourceTable(tableIdentifier) === null)
- sql(
- """
- |INSERT INTO TABLE test_insert_parquet
- |select a, b from jt
- """.stripMargin)
- checkCached(tableIdentifier)
- // Make sure we can read the data.
- checkAnswer(
- sql("select * from test_insert_parquet"),
- sql("select a, b from jt").collect())
- // Invalidate the cache.
- sessionState.refreshTable("test_insert_parquet")
- assert(getCachedDataSourceTable(tableIdentifier) === null)
-
- // Create a partitioned table.
- sql(
- """
- |create table test_parquet_partitioned_cache_test
- |(
- | intField INT,
- | stringField STRING
- |)
- |PARTITIONED BY (`date` string)
- |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
- |STORED AS
- | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
- | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
- """.stripMargin)
-
- tableIdentifier = TableIdentifier("test_parquet_partitioned_cache_test", Some("default"))
- assert(getCachedDataSourceTable(tableIdentifier) === null)
- sql(
- """
- |INSERT INTO TABLE test_parquet_partitioned_cache_test
- |PARTITION (`date`='2015-04-01')
- |select a, b from jt
- """.stripMargin)
- // Right now, insert into a partitioned Parquet is not supported in data source Parquet.
- // So, we expect it is not cached.
- assert(getCachedDataSourceTable(tableIdentifier) === null)
- sql(
- """
- |INSERT INTO TABLE test_parquet_partitioned_cache_test
- |PARTITION (`date`='2015-04-02')
- |select a, b from jt
- """.stripMargin)
- assert(getCachedDataSourceTable(tableIdentifier) === null)
-
- // Make sure we can cache the partitioned table.
- table("test_parquet_partitioned_cache_test")
- checkCached(tableIdentifier)
- // Make sure we can read the data.
- checkAnswer(
- sql("select STRINGField, `date`, intField from test_parquet_partitioned_cache_test"),
- sql(
- """
- |select b, '2015-04-01', a FROM jt
- |UNION ALL
- |select b, '2015-04-02', a FROM jt
- """.stripMargin).collect())
-
- sessionState.refreshTable("test_parquet_partitioned_cache_test")
- assert(getCachedDataSourceTable(tableIdentifier) === null)
-
- dropTables("test_insert_parquet", "test_parquet_partitioned_cache_test")
- }
-
- test("SPARK-15248: explicitly added partitions should be readable") {
- withTable("test_added_partitions", "test_temp") {
- withTempDir { src =>
- val partitionDir = new File(src, "partition").toURI
- sql(
- """
- |CREATE TABLE test_added_partitions (a STRING)
- |PARTITIONED BY (b INT)
- |STORED AS PARQUET
- """.stripMargin)
-
- // Temp view that is used to insert data into partitioned table
- Seq("foo", "bar").toDF("a").createOrReplaceTempView("test_temp")
- sql("INSERT INTO test_added_partitions PARTITION(b='0') SELECT a FROM test_temp")
-
- checkAnswer(
- sql("SELECT * FROM test_added_partitions"),
- Seq(Row("foo", 0), Row("bar", 0)))
-
- // Create partition without data files and check whether it can be read
- sql(s"ALTER TABLE test_added_partitions ADD PARTITION (b='1') LOCATION '$partitionDir'")
- checkAnswer(
- sql("SELECT * FROM test_added_partitions"),
- Seq(Row("foo", 0), Row("bar", 0)))
-
- // Add data files to partition directory and check whether they can be read
- sql("INSERT INTO TABLE test_added_partitions PARTITION (b=1) select 'baz' as a")
- checkAnswer(
- sql("SELECT * FROM test_added_partitions"),
- Seq(Row("foo", 0), Row("bar", 0), Row("baz", 1)))
-
- // Check it with pruning predicates
- checkAnswer(
- sql("SELECT * FROM test_added_partitions where b = 0"),
- Seq(Row("foo", 0), Row("bar", 0)))
- checkAnswer(
- sql("SELECT * FROM test_added_partitions where b = 1"),
- Seq(Row("baz", 1)))
- checkAnswer(
- sql("SELECT * FROM test_added_partitions where b = 2"),
- Seq.empty)
-
- // Also verify the inputFiles implementation
- assert(sql("select * from test_added_partitions").inputFiles.length == 2)
- assert(sql("select * from test_added_partitions where b = 0").inputFiles.length == 1)
- assert(sql("select * from test_added_partitions where b = 1").inputFiles.length == 1)
- assert(sql("select * from test_added_partitions where b = 2").inputFiles.length == 0)
- }
- }
- }
-
- test("Explicitly added partitions should be readable after load") {
- withTable("test_added_partitions") {
- withTempDir { src =>
- val newPartitionDir = src.toURI.toString
- spark.range(2).selectExpr("cast(id as string)").toDF("a").write
- .mode("overwrite")
- .parquet(newPartitionDir)
-
- sql(
- """
- |CREATE TABLE test_added_partitions (a STRING)
- |PARTITIONED BY (b INT)
- |STORED AS PARQUET
- """.stripMargin)
-
- // Create partition without data files and check whether it can be read
- sql(s"ALTER TABLE test_added_partitions ADD PARTITION (b='1')")
- // This table fetch is to fill the cache with zero leaf files
- checkAnswer(spark.table("test_added_partitions"), Seq.empty)
-
- sql(
- s"""
- |LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE
- |INTO TABLE test_added_partitions PARTITION(b='1')
- """.stripMargin)
-
- checkAnswer(
- spark.table("test_added_partitions"),
- Seq(Row("0", 1), Row("1", 1)))
- }
- }
- }
-
- test("Non-partitioned table readable after load") {
- withTable("tab") {
- withTempDir { src =>
- val newPartitionDir = src.toURI.toString
- spark.range(2).selectExpr("cast(id as string)").toDF("a").write
- .mode("overwrite")
- .parquet(newPartitionDir)
-
- sql("CREATE TABLE tab (a STRING) STORED AS PARQUET")
-
- // This table fetch is to fill the cache with zero leaf files
- checkAnswer(spark.table("tab"), Seq.empty)
-
- sql(
- s"""
- |LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE
- |INTO TABLE tab
- """.stripMargin)
-
- checkAnswer(spark.table("tab"), Seq(Row("0"), Row("1")))
- }
- }
- }
-
- test("self-join") {
- val table = spark.table("normal_parquet")
- val selfJoin = table.as("t1").crossJoin(table.as("t2"))
- checkAnswer(selfJoin,
- sql("SELECT * FROM normal_parquet x CROSS JOIN normal_parquet y"))
- }
-}
-
-/**
- * A suite of tests for the Parquet support through the data sources API.
- */
-class ParquetSourceSuite extends ParquetPartitioningTest {
- import testImplicits._
- import spark._
-
- override def beforeAll(): Unit = {
- super.beforeAll()
- dropTables("partitioned_parquet",
- "partitioned_parquet_with_key",
- "partitioned_parquet_with_complextypes",
- "partitioned_parquet_with_key_and_complextypes",
- "normal_parquet")
-
- sql( s"""
- CREATE TEMPORARY VIEW partitioned_parquet
- USING org.apache.spark.sql.parquet
- OPTIONS (
- path '${partitionedTableDir.toURI}'
- )
- """)
-
- sql( s"""
- CREATE TEMPORARY VIEW partitioned_parquet_with_key
- USING org.apache.spark.sql.parquet
- OPTIONS (
- path '${partitionedTableDirWithKey.toURI}'
- )
- """)
-
- sql( s"""
- CREATE TEMPORARY VIEW normal_parquet
- USING org.apache.spark.sql.parquet
- OPTIONS (
- path '${new File(partitionedTableDir, "p=1").toURI}'
- )
- """)
-
- sql( s"""
- CREATE TEMPORARY VIEW partitioned_parquet_with_key_and_complextypes
- USING org.apache.spark.sql.parquet
- OPTIONS (
- path '${partitionedTableDirWithKeyAndComplexTypes.toURI}'
- )
- """)
-
- sql( s"""
- CREATE TEMPORARY VIEW partitioned_parquet_with_complextypes
- USING org.apache.spark.sql.parquet
- OPTIONS (
- path '${partitionedTableDirWithComplexTypes.toURI}'
- )
- """)
- }
-
- test("SPARK-6016 make sure to use the latest footers") {
- sql("drop table if exists spark_6016_fix")
-
- // Create a DataFrame with two partitions. So, the created table will have two parquet files.
- val df1 = (1 to 10).map(Tuple1(_)).toDF("a").coalesce(2)
- df1.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("spark_6016_fix")
- checkAnswer(
- sql("select * from spark_6016_fix"),
- (1 to 10).map(i => Row(i))
- )
-
- // Create a DataFrame with four partitions. So, the created table will have four parquet files.
- val df2 = (1 to 10).map(Tuple1(_)).toDF("b").coalesce(4)
- df2.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("spark_6016_fix")
- // For the bug of SPARK-6016, we are caching two outdated footers for df1. Then,
- // since the new table has four parquet files, we are trying to read new footers from two files
- // and then merge metadata in footers of these four (two outdated ones and two latest one),
- // which will cause an error.
- checkAnswer(
- sql("select * from spark_6016_fix"),
- (1 to 10).map(i => Row(i))
- )
-
- sql("drop table spark_6016_fix")
- }
-
- test("SPARK-8811: compatibility with array of struct in Hive") {
- withTempPath { dir =>
- withTable("array_of_struct") {
- val conf = Seq(
- HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false",
- SQLConf.PARQUET_BINARY_AS_STRING.key -> "true",
- SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "false")
-
- withSQLConf(conf: _*) {
- sql(
- s"""CREATE TABLE array_of_struct
- |STORED AS PARQUET LOCATION '${dir.toURI}'
- |AS SELECT
- | '1st' AS a,
- | '2nd' AS b,
- | ARRAY(NAMED_STRUCT('a', 'val_a', 'b', 'val_b')) AS c
- """.stripMargin)
-
- checkAnswer(
- spark.read.parquet(dir.getCanonicalPath),
- Row("1st", "2nd", Seq(Row("val_a", "val_b"))))
- }
- }
- }
- }
-
- test("Verify the PARQUET conversion parameter: CONVERT_METASTORE_PARQUET") {
- withTempView("single") {
- val singleRowDF = Seq((0, "foo")).toDF("key", "value")
- singleRowDF.createOrReplaceTempView("single")
-
- Seq("true", "false").foreach { parquetConversion =>
- withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> parquetConversion) {
- val tableName = "test_parquet_ctas"
- withTable(tableName) {
- sql(
- s"""
- |CREATE TABLE $tableName STORED AS PARQUET
- |AS SELECT tmp.key, tmp.value FROM single tmp
- """.stripMargin)
-
- val df = spark.sql(s"SELECT * FROM $tableName WHERE key=0")
- checkAnswer(df, singleRowDF)
-
- val queryExecution = df.queryExecution
- if (parquetConversion == "true") {
- queryExecution.analyzed.collectFirst {
- case _: LogicalRelation =>
- }.getOrElse {
- fail(s"Expecting the query plan to convert parquet to data sources, " +
- s"but got:\n$queryExecution")
- }
- } else {
- queryExecution.analyzed.collectFirst {
- case _: HiveTableRelation =>
- }.getOrElse {
- fail(s"Expecting no conversion from parquet to data sources, " +
- s"but got:\n$queryExecution")
- }
- }
- }
- }
- }
- }
- }
-
- test("values in arrays and maps stored in parquet are always nullable") {
- val df = createDataFrame(Tuple2(Map(2 -> 3), Seq(4, 5, 6)) :: Nil).toDF("m", "a")
- val mapType1 = MapType(IntegerType, IntegerType, valueContainsNull = false)
- val arrayType1 = ArrayType(IntegerType, containsNull = false)
- val expectedSchema1 =
- StructType(
- StructField("m", mapType1, nullable = true) ::
- StructField("a", arrayType1, nullable = true) :: Nil)
- assert(df.schema === expectedSchema1)
-
- withTable("alwaysNullable") {
- df.write.format("parquet").saveAsTable("alwaysNullable")
-
- val mapType2 = MapType(IntegerType, IntegerType, valueContainsNull = true)
- val arrayType2 = ArrayType(IntegerType, containsNull = true)
- val expectedSchema2 =
- StructType(
- StructField("m", mapType2, nullable = true) ::
- StructField("a", arrayType2, nullable = true) :: Nil)
-
- assert(table("alwaysNullable").schema === expectedSchema2)
-
- checkAnswer(
- sql("SELECT m, a FROM alwaysNullable"),
- Row(Map(2 -> 3), Seq(4, 5, 6)))
- }
- }
-
- test("Aggregation attribute names can't contain special chars \" ,;{}()\\n\\t=\"") {
- val tempDir = Utils.createTempDir()
- val filePath = new File(tempDir, "testParquet").getCanonicalPath
- val filePath2 = new File(tempDir, "testParquet2").getCanonicalPath
-
- val df = Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str")
- val df2 = df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("y.str").max("y.int")
- intercept[Throwable](df2.write.parquet(filePath))
-
- val df3 = df2.toDF("str", "max_int")
- df3.write.parquet(filePath2)
- val df4 = read.parquet(filePath2)
- checkAnswer(df4, Row("1", 1) :: Row("2", 2) :: Row("3", 3) :: Nil)
- assert(df4.columns === Array("str", "max_int"))
- }
-}
-
-/**
- * A collection of tests for parquet data with various forms of partitioning.
- */
-abstract class ParquetPartitioningTest extends QueryTest with SQLTestUtils with TestHiveSingleton {
- import testImplicits._
-
- var partitionedTableDir: File = null
- var normalTableDir: File = null
- var partitionedTableDirWithKey: File = null
- var partitionedTableDirWithComplexTypes: File = null
- var partitionedTableDirWithKeyAndComplexTypes: File = null
-
- override def beforeAll(): Unit = {
- super.beforeAll()
- partitionedTableDir = Utils.createTempDir()
- normalTableDir = Utils.createTempDir()
-
- (1 to 10).foreach { p =>
- val partDir = new File(partitionedTableDir, s"p=$p")
- sparkContext.makeRDD(1 to 10)
- .map(i => ParquetData(i, s"part-$p"))
- .toDF()
- .write.parquet(partDir.getCanonicalPath)
- }
-
- sparkContext
- .makeRDD(1 to 10)
- .map(i => ParquetData(i, s"part-1"))
- .toDF()
- .write.parquet(new File(normalTableDir, "normal").getCanonicalPath)
-
- partitionedTableDirWithKey = Utils.createTempDir()
-
- (1 to 10).foreach { p =>
- val partDir = new File(partitionedTableDirWithKey, s"p=$p")
- sparkContext.makeRDD(1 to 10)
- .map(i => ParquetDataWithKey(p, i, s"part-$p"))
- .toDF()
- .write.parquet(partDir.getCanonicalPath)
- }
-
- partitionedTableDirWithKeyAndComplexTypes = Utils.createTempDir()
-
- (1 to 10).foreach { p =>
- val partDir = new File(partitionedTableDirWithKeyAndComplexTypes, s"p=$p")
- sparkContext.makeRDD(1 to 10).map { i =>
- ParquetDataWithKeyAndComplexTypes(
- p, i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i)
- }.toDF().write.parquet(partDir.getCanonicalPath)
- }
-
- partitionedTableDirWithComplexTypes = Utils.createTempDir()
-
- (1 to 10).foreach { p =>
- val partDir = new File(partitionedTableDirWithComplexTypes, s"p=$p")
- sparkContext.makeRDD(1 to 10).map { i =>
- ParquetDataWithComplexTypes(i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i)
- }.toDF().write.parquet(partDir.getCanonicalPath)
- }
- }
-
- override protected def afterAll(): Unit = {
- try {
- partitionedTableDir.delete()
- normalTableDir.delete()
- partitionedTableDirWithKey.delete()
- partitionedTableDirWithComplexTypes.delete()
- partitionedTableDirWithKeyAndComplexTypes.delete()
- } finally {
- super.afterAll()
- }
- }
-
- /**
- * Drop named tables if they exist
- *
- * @param tableNames tables to drop
- */
- def dropTables(tableNames: String*): Unit = {
- tableNames.foreach { name =>
- sql(s"DROP TABLE IF EXISTS $name")
- }
- }
-
- Seq(
- "partitioned_parquet",
- "partitioned_parquet_with_key",
- "partitioned_parquet_with_complextypes",
- "partitioned_parquet_with_key_and_complextypes").foreach { table =>
-
- test(s"ordering of the partitioning columns $table") {
- checkAnswer(
- sql(s"SELECT p, stringField FROM $table WHERE p = 1"),
- Seq.fill(10)(Row(1, "part-1"))
- )
-
- checkAnswer(
- sql(s"SELECT stringField, p FROM $table WHERE p = 1"),
- Seq.fill(10)(Row("part-1", 1))
- )
- }
-
- test(s"project the partitioning column $table") {
- checkAnswer(
- sql(s"SELECT p, count(*) FROM $table group by p"),
- Row(1, 10) ::
- Row(2, 10) ::
- Row(3, 10) ::
- Row(4, 10) ::
- Row(5, 10) ::
- Row(6, 10) ::
- Row(7, 10) ::
- Row(8, 10) ::
- Row(9, 10) ::
- Row(10, 10) :: Nil
- )
- }
-
- test(s"project partitioning and non-partitioning columns $table") {
- checkAnswer(
- sql(s"SELECT stringField, p, count(intField) FROM $table GROUP BY p, stringField"),
- Row("part-1", 1, 10) ::
- Row("part-2", 2, 10) ::
- Row("part-3", 3, 10) ::
- Row("part-4", 4, 10) ::
- Row("part-5", 5, 10) ::
- Row("part-6", 6, 10) ::
- Row("part-7", 7, 10) ::
- Row("part-8", 8, 10) ::
- Row("part-9", 9, 10) ::
- Row("part-10", 10, 10) :: Nil
- )
- }
-
- test(s"simple count $table") {
- checkAnswer(
- sql(s"SELECT COUNT(*) FROM $table"),
- Row(100))
- }
-
- test(s"pruned count $table") {
- checkAnswer(
- sql(s"SELECT COUNT(*) FROM $table WHERE p = 1"),
- Row(10))
- }
-
- test(s"non-existent partition $table") {
- checkAnswer(
- sql(s"SELECT COUNT(*) FROM $table WHERE p = 1000"),
- Row(0))
- }
-
- test(s"multi-partition pruned count $table") {
- checkAnswer(
- sql(s"SELECT COUNT(*) FROM $table WHERE p IN (1,2,3)"),
- Row(30))
- }
-
- test(s"non-partition predicates $table") {
- checkAnswer(
- sql(s"SELECT COUNT(*) FROM $table WHERE intField IN (1,2,3)"),
- Row(30))
- }
-
- test(s"sum $table") {
- checkAnswer(
- sql(s"SELECT SUM(intField) FROM $table WHERE intField IN (1,2,3) AND p = 1"),
- Row(1 + 2 + 3))
- }
-
- test(s"hive udfs $table") {
- checkAnswer(
- sql(s"SELECT concat(stringField, stringField) FROM $table"),
- sql(s"SELECT stringField FROM $table").rdd.map {
- case Row(s: String) => Row(s + s)
- }.collect().toSeq)
- }
- }
-
- Seq(
- "partitioned_parquet_with_key_and_complextypes",
- "partitioned_parquet_with_complextypes").foreach { table =>
-
- test(s"SPARK-5775 read struct from $table") {
- checkAnswer(
- sql(
- s"""
- |SELECT p, structField.intStructField, structField.stringStructField
- |FROM $table WHERE p = 1
- """.stripMargin),
- (1 to 10).map(i => Row(1, i, f"${i}_string")))
- }
-
- test(s"SPARK-5775 read array from $table") {
- checkAnswer(
- sql(s"SELECT arrayField, p FROM $table WHERE p = 1"),
- (1 to 10).map(i => Row((1 to i).toArray, 1)))
- }
- }
-
-
- test("non-part select(*)") {
- checkAnswer(
- sql("SELECT COUNT(*) FROM normal_parquet"),
- Row(10))
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org