You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/05/22 00:54:52 UTC
[spark] branch master updated: [SPARK-31785][SQL][TESTS] Add a
helper function to test all parquet readers
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 60118a2 [SPARK-31785][SQL][TESTS] Add a helper function to test all parquet readers
60118a2 is described below
commit 60118a242639df060a9fdcaa4f14cd072ea3d056
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Fri May 22 09:53:35 2020 +0900
[SPARK-31785][SQL][TESTS] Add a helper function to test all parquet readers
### What changes were proposed in this pull request?
Add `withAllParquetReaders` to `ParquetTest`. The function allow to run a block of code for all available Parquet readers.
### Why are the changes needed?
1. It simplifies tests
2. Allow to test all parquet readers that could be available in projects based on Apache Spark.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
By running affected test suites.
Closes #28598 from MaxGekk/add-withAllParquetReaders.
Authored-by: Max Gekk <ma...@gmail.com>
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
.../datasources/parquet/ParquetFilterSuite.scala | 39 +++---
.../datasources/parquet/ParquetIOSuite.scala | 144 ++++++++++-----------
.../parquet/ParquetInteroperabilitySuite.scala | 8 +-
.../datasources/parquet/ParquetQuerySuite.scala | 30 ++---
.../datasources/parquet/ParquetTest.scala | 7 +
5 files changed, 106 insertions(+), 122 deletions(-)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 5cf2129..7b33cef 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -781,10 +781,9 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
test("Filter applied on merged Parquet schema with new column should work") {
import testImplicits._
- Seq("true", "false").foreach { vectorized =>
+ withAllParquetReaders {
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
- SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
- SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
+ SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") {
withTempPath { dir =>
val path1 = s"${dir.getCanonicalPath}/table1"
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path1)
@@ -1219,24 +1218,22 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
}
test("SPARK-17213: Broken Parquet filter push-down for string columns") {
- Seq(true, false).foreach { vectorizedEnabled =>
- withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorizedEnabled.toString) {
- withTempPath { dir =>
- import testImplicits._
+ withAllParquetReaders {
+ withTempPath { dir =>
+ import testImplicits._
- val path = dir.getCanonicalPath
- // scalastyle:off nonascii
- Seq("a", "é").toDF("name").write.parquet(path)
- // scalastyle:on nonascii
+ val path = dir.getCanonicalPath
+ // scalastyle:off nonascii
+ Seq("a", "é").toDF("name").write.parquet(path)
+ // scalastyle:on nonascii
- assert(spark.read.parquet(path).where("name > 'a'").count() == 1)
- assert(spark.read.parquet(path).where("name >= 'a'").count() == 2)
+ assert(spark.read.parquet(path).where("name > 'a'").count() == 1)
+ assert(spark.read.parquet(path).where("name >= 'a'").count() == 2)
- // scalastyle:off nonascii
- assert(spark.read.parquet(path).where("name < 'é'").count() == 1)
- assert(spark.read.parquet(path).where("name <= 'é'").count() == 2)
- // scalastyle:on nonascii
- }
+ // scalastyle:off nonascii
+ assert(spark.read.parquet(path).where("name < 'é'").count() == 1)
+ assert(spark.read.parquet(path).where("name <= 'é'").count() == 2)
+ // scalastyle:on nonascii
}
}
}
@@ -1244,8 +1241,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
test("SPARK-31026: Parquet predicate pushdown for fields having dots in the names") {
import testImplicits._
- Seq(true, false).foreach { vectorized =>
- withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString,
+ withAllParquetReaders {
+ withSQLConf(
SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> true.toString,
SQLConf.SUPPORT_QUOTED_REGEX_COLUMN_NAME.key -> "false") {
withTempPath { path =>
@@ -1255,7 +1252,7 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
}
}
- withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString,
+ withSQLConf(
// Makes sure disabling 'spark.sql.parquet.recordFilter' still enables
// row group level filtering.
SQLConf.PARQUET_RECORD_FILTER_ENABLED.key -> "false",
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 87b4db3..f075d04 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -647,47 +647,39 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
test("read dictionary encoded decimals written as INT32") {
- ("true" :: "false" :: Nil).foreach { vectorized =>
- withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
- checkAnswer(
- // Decimal column in this file is encoded using plain dictionary
- readResourceParquetFile("test-data/dec-in-i32.parquet"),
- spark.range(1 << 4).select('id % 10 cast DecimalType(5, 2) as 'i32_dec))
- }
+ withAllParquetReaders {
+ checkAnswer(
+ // Decimal column in this file is encoded using plain dictionary
+ readResourceParquetFile("test-data/dec-in-i32.parquet"),
+ spark.range(1 << 4).select('id % 10 cast DecimalType(5, 2) as 'i32_dec))
}
}
test("read dictionary encoded decimals written as INT64") {
- ("true" :: "false" :: Nil).foreach { vectorized =>
- withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
- checkAnswer(
- // Decimal column in this file is encoded using plain dictionary
- readResourceParquetFile("test-data/dec-in-i64.parquet"),
- spark.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 'i64_dec))
- }
+ withAllParquetReaders {
+ checkAnswer(
+ // Decimal column in this file is encoded using plain dictionary
+ readResourceParquetFile("test-data/dec-in-i64.parquet"),
+ spark.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 'i64_dec))
}
}
test("read dictionary encoded decimals written as FIXED_LEN_BYTE_ARRAY") {
- ("true" :: "false" :: Nil).foreach { vectorized =>
- withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
- checkAnswer(
- // Decimal column in this file is encoded using plain dictionary
- readResourceParquetFile("test-data/dec-in-fixed-len.parquet"),
- spark.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 'fixed_len_dec))
- }
+ withAllParquetReaders {
+ checkAnswer(
+ // Decimal column in this file is encoded using plain dictionary
+ readResourceParquetFile("test-data/dec-in-fixed-len.parquet"),
+ spark.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 'fixed_len_dec))
}
}
test("read dictionary and plain encoded timestamp_millis written as INT64") {
- ("true" :: "false" :: Nil).foreach { vectorized =>
- withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
- checkAnswer(
- // timestamp column in this file is encoded using combination of plain
- // and dictionary encodings.
- readResourceParquetFile("test-data/timemillis-in-i64.parquet"),
- (1 to 3).map(i => Row(new java.sql.Timestamp(10))))
- }
+ withAllParquetReaders {
+ checkAnswer(
+ // timestamp column in this file is encoded using combination of plain
+ // and dictionary encodings.
+ readResourceParquetFile("test-data/timemillis-in-i64.parquet"),
+ (1 to 3).map(i => Row(new java.sql.Timestamp(10))))
}
}
@@ -943,23 +935,21 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}
- Seq(false, true).foreach { vectorized =>
- withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
- checkReadMixedFiles("before_1582_date_v2_4.snappy.parquet", "date", "1001-01-01")
- checkReadMixedFiles(
- "before_1582_timestamp_micros_v2_4.snappy.parquet",
- "TIMESTAMP_MICROS",
- "1001-01-01 01:02:03.123456")
- checkReadMixedFiles(
- "before_1582_timestamp_millis_v2_4.snappy.parquet",
- "TIMESTAMP_MILLIS",
- "1001-01-01 01:02:03.123")
-
- // INT96 is a legacy timestamp format and we always rebase the seconds for it.
- checkAnswer(readResourceParquetFile(
- "test-data/before_1582_timestamp_int96_v2_4.snappy.parquet"),
- Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
- }
+ withAllParquetReaders {
+ checkReadMixedFiles("before_1582_date_v2_4.snappy.parquet", "date", "1001-01-01")
+ checkReadMixedFiles(
+ "before_1582_timestamp_micros_v2_4.snappy.parquet",
+ "TIMESTAMP_MICROS",
+ "1001-01-01 01:02:03.123456")
+ checkReadMixedFiles(
+ "before_1582_timestamp_millis_v2_4.snappy.parquet",
+ "TIMESTAMP_MILLIS",
+ "1001-01-01 01:02:03.123")
+
+ // INT96 is a legacy timestamp format and we always rebase the seconds for it.
+ checkAnswer(readResourceParquetFile(
+ "test-data/before_1582_timestamp_int96_v2_4.snappy.parquet"),
+ Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
}
}
@@ -984,27 +974,25 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
.parquet(path)
}
- Seq(false, true).foreach { vectorized =>
- withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
- // The file metadata indicates if it needs rebase or not, so we can always get the
- // correct result regardless of the "rebase mode" config.
- Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode =>
- withSQLConf(
- SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key -> mode.toString) {
- checkAnswer(
- spark.read.parquet(path),
- Seq.tabulate(N)(_ => Row(Timestamp.valueOf(tsStr))))
- }
- }
-
- // Force to not rebase to prove the written datetime values are rebased
- // and we will get wrong result if we don't rebase while reading.
- withSQLConf("spark.test.forceNoRebase" -> "true") {
+ withAllParquetReaders {
+ // The file metadata indicates if it needs rebase or not, so we can always get the
+ // correct result regardless of the "rebase mode" config.
+ Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode =>
+ withSQLConf(
+ SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key -> mode.toString) {
checkAnswer(
spark.read.parquet(path),
- Seq.tabulate(N)(_ => Row(Timestamp.valueOf(nonRebased))))
+ Seq.tabulate(N)(_ => Row(Timestamp.valueOf(tsStr))))
}
}
+
+ // Force to not rebase to prove the written datetime values are rebased
+ // and we will get wrong result if we don't rebase while reading.
+ withSQLConf("spark.test.forceNoRebase" -> "true") {
+ checkAnswer(
+ spark.read.parquet(path),
+ Seq.tabulate(N)(_ => Row(Timestamp.valueOf(nonRebased))))
+ }
}
}
}
@@ -1027,26 +1015,24 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
.parquet(path)
}
- Seq(false, true).foreach { vectorized =>
- withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
- // The file metadata indicates if it needs rebase or not, so we can always get the
- // correct result regardless of the "rebase mode" config.
- Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode =>
- withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> mode.toString) {
- checkAnswer(
- spark.read.parquet(path),
- Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-01"))))
- }
- }
-
- // Force to not rebase to prove the written datetime values are rebased and we will get
- // wrong result if we don't rebase while reading.
- withSQLConf("spark.test.forceNoRebase" -> "true") {
+ withAllParquetReaders {
+ // The file metadata indicates if it needs rebase or not, so we can always get the
+ // correct result regardless of the "rebase mode" config.
+ Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode =>
+ withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> mode.toString) {
checkAnswer(
spark.read.parquet(path),
- Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-07"))))
+ Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-01"))))
}
}
+
+ // Force to not rebase to prove the written datetime values are rebased and we will get
+ // wrong result if we don't rebase while reading.
+ withSQLConf("spark.test.forceNoRebase" -> "true") {
+ checkAnswer(
+ spark.read.parquet(path),
+ Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-07"))))
+ }
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
index 7d75077..a14f641 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
@@ -124,12 +124,11 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS
FileUtils.copyFile(new File(impalaPath), new File(tableDir, "part-00001.parq"))
Seq(false, true).foreach { int96TimestampConversion =>
- Seq(false, true).foreach { vectorized =>
+ withAllParquetReaders {
withSQLConf(
(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key,
SQLConf.ParquetOutputTimestampType.INT96.toString),
- (SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key, int96TimestampConversion.toString()),
- (SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, vectorized.toString())
+ (SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key, int96TimestampConversion.toString())
) {
val readBack = spark.read.parquet(tableDir.getAbsolutePath).collect()
assert(readBack.size === 6)
@@ -149,7 +148,8 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS
val fullExpectations = (ts ++ impalaExpectations).map(_.toString).sorted.toArray
val actual = readBack.map(_.getTimestamp(0).toString).sorted
withClue(
- s"int96TimestampConversion = $int96TimestampConversion; vectorized = $vectorized") {
+ s"int96TimestampConversion = $int96TimestampConversion; " +
+ s"vectorized = ${SQLConf.get.parquetVectorizedReaderEnabled}") {
assert(fullExpectations === actual)
// Now test that the behavior is still correct even with a filter which could get
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 917aaba..05d305a9b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -168,11 +168,9 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
withTempPath { file =>
val df = spark.createDataFrame(sparkContext.parallelize(data), schema)
df.write.parquet(file.getCanonicalPath)
- ("true" :: "false" :: Nil).foreach { vectorized =>
- withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
- val df2 = spark.read.parquet(file.getCanonicalPath)
- checkAnswer(df2, df.collect().toSeq)
- }
+ withAllParquetReaders {
+ val df2 = spark.read.parquet(file.getCanonicalPath)
+ checkAnswer(df2, df.collect().toSeq)
}
}
}
@@ -791,15 +789,13 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
}
test("SPARK-26677: negated null-safe equality comparison should not filter matched row groups") {
- (true :: false :: Nil).foreach { vectorized =>
- withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
- withTempPath { path =>
- // Repeated values for dictionary encoding.
- Seq(Some("A"), Some("A"), None).toDF.repartition(1)
- .write.parquet(path.getAbsolutePath)
- val df = spark.read.parquet(path.getAbsolutePath)
- checkAnswer(stripSparkFilter(df.where("NOT (value <=> 'A')")), df)
- }
+ withAllParquetReaders {
+ withTempPath { path =>
+ // Repeated values for dictionary encoding.
+ Seq(Some("A"), Some("A"), None).toDF.repartition(1)
+ .write.parquet(path.getAbsolutePath)
+ val df = spark.read.parquet(path.getAbsolutePath)
+ checkAnswer(stripSparkFilter(df.where("NOT (value <=> 'A')")), df)
}
}
}
@@ -821,10 +817,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> toTsType) {
write(df2.write.mode(SaveMode.Append))
}
- Seq("true", "false").foreach { vectorized =>
- withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
- checkAnswer(readback, df1.unionAll(df2))
- }
+ withAllParquetReaders {
+ checkAnswer(readback, df1.unionAll(df2))
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
index c833d5f..f572697 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
@@ -162,4 +162,11 @@ private[sql] trait ParquetTest extends FileBasedDataSourceTest {
protected def getResourceParquetFilePath(name: String): String = {
Thread.currentThread().getContextClassLoader.getResource(name).toString
}
+
+ def withAllParquetReaders(code: => Unit): Unit = {
+ // test the row-based reader
+ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false")(code)
+ // test the vectorized reader
+ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true")(code)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org