You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/07/14 05:51:24 UTC
[spark] branch branch-3.2 updated: [SPARK-36131][SQL][TEST]
Refactor ParquetColumnIndexSuite
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new b435560 [SPARK-36131][SQL][TEST] Refactor ParquetColumnIndexSuite
b435560 is described below
commit b4355608e0697b7c3e596a2104c17366fd6053bb
Author: Chao Sun <su...@apple.com>
AuthorDate: Tue Jul 13 22:49:55 2021 -0700
[SPARK-36131][SQL][TEST] Refactor ParquetColumnIndexSuite
### What changes were proposed in this pull request?
Refactor `ParquetColumnIndexSuite` and allow better code reuse.
### Why are the changes needed?
A few methods in the test suite can share the same utility method `checkUnalignedPages` so it's better to do that and remove code duplication.
Additionally, `parquet.enable.dictionary` is tested for both `true` and `false` combination.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing tests.
Closes #33334 from sunchao/SPARK-35743-test-refactoring.
Authored-by: Chao Sun <su...@apple.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
(cherry picked from commit 7a7b086534ebb6d2d0f077897f43018f65334c6c)
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../parquet/ParquetColumnIndexSuite.scala | 129 ++++++++-------------
1 file changed, 49 insertions(+), 80 deletions(-)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnIndexSuite.scala
index f10b701..8a217fe 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnIndexSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnIndexSuite.scala
@@ -23,6 +23,15 @@ import org.apache.spark.sql.test.SharedSparkSession
class ParquetColumnIndexSuite extends QueryTest with ParquetTest with SharedSparkSession {
import testImplicits._
+ private val actions: Seq[DataFrame => DataFrame] = Seq(
+ "_1 = 500",
+ "_1 = 500 or _1 = 1500",
+ "_1 = 500 or _1 = 501 or _1 = 1500",
+ "_1 = 500 or _1 = 501 or _1 = 1000 or _1 = 1500",
+ "_1 >= 500 and _1 < 1000",
+ "(_1 >= 500 and _1 < 1000) or (_1 >= 1500 and _1 < 1600)"
+ ).map(f => (df: DataFrame) => df.filter(f))
+
/**
* create parquet file with two columns and unaligned pages
* pages will be of the following layout
@@ -31,96 +40,56 @@ class ParquetColumnIndexSuite extends QueryTest with ParquetTest with SharedSpar
* |-------|-----|-----|---|---|---|---|---|
* col_2 400 300 200 200 200 200 200 200
*/
- def checkUnalignedPages(actions: (DataFrame => DataFrame)*): Unit = {
- withTempPath(file => {
- val ds = spark.range(0, 2000).map(i => (i, i + ":" + "o" * (i / 100).toInt))
- ds.coalesce(1)
- .write
- .option("parquet.page.size", "4096")
- .parquet(file.getCanonicalPath)
+ def checkUnalignedPages(df: DataFrame)(actions: (DataFrame => DataFrame)*): Unit = {
+ Seq(true, false).foreach { enableDictionary =>
+ withTempPath(file => {
+ df.coalesce(1)
+ .write
+ .option("parquet.page.size", "4096")
+ .option("parquet.enable.dictionary", enableDictionary.toString)
+ .parquet(file.getCanonicalPath)
- val parquetDf = spark.read.parquet(file.getCanonicalPath)
+ val parquetDf = spark.read.parquet(file.getCanonicalPath)
- actions.foreach { action =>
- checkAnswer(action(parquetDf), action(ds.toDF()))
- }
- })
+ actions.foreach { action =>
+ checkAnswer(action(parquetDf), action(df))
+ }
+ })
+ }
}
test("reading from unaligned pages - test filters") {
- checkUnalignedPages(
- // single value filter
- df => df.filter("_1 = 500"),
- df => df.filter("_1 = 500 or _1 = 1500"),
- df => df.filter("_1 = 500 or _1 = 501 or _1 = 1500"),
- df => df.filter("_1 = 500 or _1 = 501 or _1 = 1000 or _1 = 1500"),
- // range filter
- df => df.filter("_1 >= 500 and _1 < 1000"),
- df => df.filter("(_1 >= 500 and _1 < 1000) or (_1 >= 1500 and _1 < 1600)")
- )
+ val df = spark.range(0, 2000).map(i => (i, i + ":" + "o" * (i / 100).toInt)).toDF()
+ checkUnalignedPages(df)(actions: _*)
}
test("test reading unaligned pages - test all types") {
- withTempPath(file => {
- val df = spark.range(0, 2000).selectExpr(
- "id as _1",
- "cast(id as short) as _3",
- "cast(id as int) as _4",
- "cast(id as float) as _5",
- "cast(id as double) as _6",
- "cast(id as decimal(20,0)) as _7",
- "cast(cast(1618161925000 + id * 1000 * 60 * 60 * 24 as timestamp) as date) as _9",
- "cast(1618161925000 + id as timestamp) as _10"
- )
- df.coalesce(1)
- .write
- .option("parquet.page.size", "4096")
- .parquet(file.getCanonicalPath)
-
- val parquetDf = spark.read.parquet(file.getCanonicalPath)
- val singleValueFilterExpr = "_1 = 500 or _1 = 1500"
- checkAnswer(
- parquetDf.filter(singleValueFilterExpr),
- df.filter(singleValueFilterExpr)
- )
- val rangeFilterExpr = "_1 > 500 "
- checkAnswer(
- parquetDf.filter(rangeFilterExpr),
- df.filter(rangeFilterExpr)
- )
- })
+ val df = spark.range(0, 2000).selectExpr(
+ "id as _1",
+ "cast(id as short) as _3",
+ "cast(id as int) as _4",
+ "cast(id as float) as _5",
+ "cast(id as double) as _6",
+ "cast(id as decimal(20,0)) as _7",
+ "cast(cast(1618161925000 + id * 1000 * 60 * 60 * 24 as timestamp) as date) as _9",
+ "cast(1618161925000 + id as timestamp) as _10"
+ )
+ checkUnalignedPages(df)(actions: _*)
}
test("test reading unaligned pages - test all types (dict encode)") {
- withTempPath(file => {
- val df = spark.range(0, 2000).selectExpr(
- "id as _1",
- "cast(id % 10 as byte) as _2",
- "cast(id % 10 as short) as _3",
- "cast(id % 10 as int) as _4",
- "cast(id % 10 as float) as _5",
- "cast(id % 10 as double) as _6",
- "cast(id % 10 as decimal(20,0)) as _7",
- "cast(id % 2 as boolean) as _8",
- "cast(cast(1618161925000 + (id % 10) * 1000 * 60 * 60 * 24 as timestamp) as date) as _9",
- "cast(1618161925000 + (id % 10) as timestamp) as _10"
- )
- df.coalesce(1)
- .write
- .option("parquet.page.size", "4096")
- .parquet(file.getCanonicalPath)
-
- val parquetDf = spark.read.parquet(file.getCanonicalPath)
- val singleValueFilterExpr = "_1 = 500 or _1 = 1500"
- checkAnswer(
- parquetDf.filter(singleValueFilterExpr),
- df.filter(singleValueFilterExpr)
- )
- val rangeFilterExpr = "_1 > 500"
- checkAnswer(
- parquetDf.filter(rangeFilterExpr),
- df.filter(rangeFilterExpr)
- )
- })
+ val df = spark.range(0, 2000).selectExpr(
+ "id as _1",
+ "cast(id % 10 as byte) as _2",
+ "cast(id % 10 as short) as _3",
+ "cast(id % 10 as int) as _4",
+ "cast(id % 10 as float) as _5",
+ "cast(id % 10 as double) as _6",
+ "cast(id % 10 as decimal(20,0)) as _7",
+ "cast(id % 2 as boolean) as _8",
+ "cast(cast(1618161925000 + (id % 10) * 1000 * 60 * 60 * 24 as timestamp) as date) as _9",
+ "cast(1618161925000 + (id % 10) as timestamp) as _10"
+ )
+ checkUnalignedPages(df)(actions: _*)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org