You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/07/14 05:51:24 UTC

[spark] branch branch-3.2 updated: [SPARK-36131][SQL][TEST] Refactor ParquetColumnIndexSuite

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new b435560  [SPARK-36131][SQL][TEST] Refactor ParquetColumnIndexSuite
b435560 is described below

commit b4355608e0697b7c3e596a2104c17366fd6053bb
Author: Chao Sun <su...@apple.com>
AuthorDate: Tue Jul 13 22:49:55 2021 -0700

    [SPARK-36131][SQL][TEST] Refactor ParquetColumnIndexSuite
    
    ### What changes were proposed in this pull request?
    
    Refactor `ParquetColumnIndexSuite` and allow better code reuse.
    
    ### Why are the changes needed?
    
    A few methods in the test suite can share the same utility method `checkUnalignedPages` so it's better to do that and remove code duplication.
    
    Additionally, `parquet.enable.dictionary` is tested for both `true` and `false` combination.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing tests.
    
    Closes #33334 from sunchao/SPARK-35743-test-refactoring.
    
    Authored-by: Chao Sun <su...@apple.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
    (cherry picked from commit 7a7b086534ebb6d2d0f077897f43018f65334c6c)
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../parquet/ParquetColumnIndexSuite.scala          | 129 ++++++++-------------
 1 file changed, 49 insertions(+), 80 deletions(-)

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnIndexSuite.scala
index f10b701..8a217fe 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnIndexSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnIndexSuite.scala
@@ -23,6 +23,15 @@ import org.apache.spark.sql.test.SharedSparkSession
 class ParquetColumnIndexSuite extends QueryTest with ParquetTest with SharedSparkSession {
   import testImplicits._
 
+  private val actions: Seq[DataFrame => DataFrame] = Seq(
+    "_1 = 500",
+    "_1 = 500 or _1 = 1500",
+    "_1 = 500 or _1 = 501 or _1 = 1500",
+    "_1 = 500 or _1 = 501 or _1 = 1000 or _1 = 1500",
+    "_1 >= 500 and _1 < 1000",
+    "(_1 >= 500 and _1 < 1000) or (_1 >= 1500 and _1 < 1600)"
+  ).map(f => (df: DataFrame) => df.filter(f))
+
   /**
    * create parquet file with two columns and unaligned pages
    * pages will be of the following layout
@@ -31,96 +40,56 @@ class ParquetColumnIndexSuite extends QueryTest with ParquetTest with SharedSpar
    *  |-------|-----|-----|---|---|---|---|---|
    * col_2   400   300   200 200 200 200 200 200
    */
-  def checkUnalignedPages(actions: (DataFrame => DataFrame)*): Unit = {
-    withTempPath(file => {
-      val ds = spark.range(0, 2000).map(i => (i, i + ":" + "o" * (i / 100).toInt))
-      ds.coalesce(1)
-          .write
-          .option("parquet.page.size", "4096")
-          .parquet(file.getCanonicalPath)
+  def checkUnalignedPages(df: DataFrame)(actions: (DataFrame => DataFrame)*): Unit = {
+    Seq(true, false).foreach { enableDictionary =>
+      withTempPath(file => {
+        df.coalesce(1)
+            .write
+            .option("parquet.page.size", "4096")
+            .option("parquet.enable.dictionary", enableDictionary.toString)
+            .parquet(file.getCanonicalPath)
 
-      val parquetDf = spark.read.parquet(file.getCanonicalPath)
+        val parquetDf = spark.read.parquet(file.getCanonicalPath)
 
-      actions.foreach { action =>
-        checkAnswer(action(parquetDf), action(ds.toDF()))
-      }
-    })
+        actions.foreach { action =>
+          checkAnswer(action(parquetDf), action(df))
+        }
+      })
+    }
   }
 
   test("reading from unaligned pages - test filters") {
-    checkUnalignedPages(
-      // single value filter
-      df => df.filter("_1 = 500"),
-      df => df.filter("_1 = 500 or _1 = 1500"),
-      df => df.filter("_1 = 500 or _1 = 501 or _1 = 1500"),
-      df => df.filter("_1 = 500 or _1 = 501 or _1 = 1000 or _1 = 1500"),
-      // range filter
-      df => df.filter("_1 >= 500 and _1 < 1000"),
-      df => df.filter("(_1 >= 500 and _1 < 1000) or (_1 >= 1500 and _1 < 1600)")
-    )
+    val df = spark.range(0, 2000).map(i => (i, i + ":" + "o" * (i / 100).toInt)).toDF()
+    checkUnalignedPages(df)(actions: _*)
   }
 
   test("test reading unaligned pages - test all types") {
-    withTempPath(file => {
-      val df = spark.range(0, 2000).selectExpr(
-        "id as _1",
-        "cast(id as short) as _3",
-        "cast(id as int) as _4",
-        "cast(id as float) as _5",
-        "cast(id as double) as _6",
-        "cast(id as decimal(20,0)) as _7",
-        "cast(cast(1618161925000 + id * 1000 * 60 * 60 * 24 as timestamp) as date) as _9",
-        "cast(1618161925000 + id as timestamp) as _10"
-      )
-      df.coalesce(1)
-          .write
-          .option("parquet.page.size", "4096")
-          .parquet(file.getCanonicalPath)
-
-      val parquetDf = spark.read.parquet(file.getCanonicalPath)
-      val singleValueFilterExpr = "_1 = 500 or _1 = 1500"
-      checkAnswer(
-        parquetDf.filter(singleValueFilterExpr),
-        df.filter(singleValueFilterExpr)
-      )
-      val rangeFilterExpr = "_1 > 500 "
-      checkAnswer(
-        parquetDf.filter(rangeFilterExpr),
-        df.filter(rangeFilterExpr)
-      )
-    })
+    val df = spark.range(0, 2000).selectExpr(
+      "id as _1",
+      "cast(id as short) as _3",
+      "cast(id as int) as _4",
+      "cast(id as float) as _5",
+      "cast(id as double) as _6",
+      "cast(id as decimal(20,0)) as _7",
+      "cast(cast(1618161925000 + id * 1000 * 60 * 60 * 24 as timestamp) as date) as _9",
+      "cast(1618161925000 + id as timestamp) as _10"
+    )
+    checkUnalignedPages(df)(actions: _*)
   }
 
   test("test reading unaligned pages - test all types (dict encode)") {
-    withTempPath(file => {
-      val df = spark.range(0, 2000).selectExpr(
-        "id as _1",
-        "cast(id % 10 as byte) as _2",
-        "cast(id % 10 as short) as _3",
-        "cast(id % 10 as int) as _4",
-        "cast(id % 10 as float) as _5",
-        "cast(id % 10 as double) as _6",
-        "cast(id % 10 as decimal(20,0)) as _7",
-        "cast(id % 2 as boolean) as _8",
-        "cast(cast(1618161925000 + (id % 10) * 1000 * 60 * 60 * 24 as timestamp) as date) as _9",
-        "cast(1618161925000 + (id % 10) as timestamp) as _10"
-      )
-      df.coalesce(1)
-          .write
-          .option("parquet.page.size", "4096")
-          .parquet(file.getCanonicalPath)
-
-      val parquetDf = spark.read.parquet(file.getCanonicalPath)
-      val singleValueFilterExpr = "_1 = 500 or _1 = 1500"
-      checkAnswer(
-        parquetDf.filter(singleValueFilterExpr),
-        df.filter(singleValueFilterExpr)
-      )
-      val rangeFilterExpr = "_1 > 500"
-      checkAnswer(
-        parquetDf.filter(rangeFilterExpr),
-        df.filter(rangeFilterExpr)
-      )
-    })
+    val df = spark.range(0, 2000).selectExpr(
+      "id as _1",
+      "cast(id % 10 as byte) as _2",
+      "cast(id % 10 as short) as _3",
+      "cast(id % 10 as int) as _4",
+      "cast(id % 10 as float) as _5",
+      "cast(id % 10 as double) as _6",
+      "cast(id % 10 as decimal(20,0)) as _7",
+      "cast(id % 2 as boolean) as _8",
+      "cast(cast(1618161925000 + (id % 10) * 1000 * 60 * 60 * 24 as timestamp) as date) as _9",
+      "cast(1618161925000 + (id % 10) as timestamp) as _10"
+    )
+    checkUnalignedPages(df)(actions: _*)
   }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org