You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/04/13 09:42:11 UTC

[spark] branch branch-2.4 updated: [SPARK-34212][SQL][FOLLOWUP] Move the added test to ParquetQuerySuite

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 63ebabb  [SPARK-34212][SQL][FOLLOWUP] Move the added test to ParquetQuerySuite
63ebabb is described below

commit 63ebabba57de17479310cc5161d3f7bcabaedbb6
Author: Yuming Wang <yu...@ebay.com>
AuthorDate: Tue Apr 13 09:04:47 2021 +0000

    [SPARK-34212][SQL][FOLLOWUP] Move the added test to ParquetQuerySuite
    
    This pr moves the added test from `SQLQuerySuite` to `ParquetQuerySuite`.
    
    1. It can be tested by `ParquetV1QuerySuite` and `ParquetV2QuerySuite`.
    2. Reduce the testing time of `SQLQuerySuite`(SQLQuerySuite ~ 3 min 17 sec, ParquetV1QuerySuite ~ 27 sec).
    
    No.
    
    Unit test.
    
    Closes #32090 from wangyum/SPARK-34212.
    
    Authored-by: Yuming Wang <yu...@ebay.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 64 ----------------------
 .../datasources/parquet/ParquetQuerySuite.scala    | 64 +++++++++++++++++++++-
 2 files changed, 63 insertions(+), 65 deletions(-)

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index f262eab..ab2a1c9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -29,7 +29,6 @@ import org.apache.spark.sql.execution.aggregate
 import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec}
 import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
 import org.apache.spark.sql.execution.datasources.FilePartition
-import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
@@ -3141,69 +3140,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
       }
     })
   }
-
-  test("SPARK-34212 Parquet should read decimals correctly") {
-    def readParquet(schema: String, path: File): DataFrame = {
-      spark.read.schema(schema).parquet(path.toString)
-    }
-
-    withTempPath { path =>
-      // a is int-decimal (4 bytes), b is long-decimal (8 bytes), c is binary-decimal (16 bytes)
-      val df = sql("SELECT 1.0 a, CAST(1.23 AS DECIMAL(17, 2)) b, CAST(1.23 AS DECIMAL(36, 2)) c")
-      df.write.parquet(path.toString)
-
-      Seq(true, false).foreach { vectorizedReader =>
-        withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorizedReader.toString) {
-          // We can read the decimal parquet field with a larger precision, if scale is the same.
-          val schema = "a DECIMAL(9, 1), b DECIMAL(18, 2), c DECIMAL(38, 2)"
-          checkAnswer(readParquet(schema, path), df)
-        }
-      }
-
-      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
-        val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)"
-        checkAnswer(readParquet(schema1, path), df)
-        val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)"
-        checkAnswer(readParquet(schema2, path), Row(1, 1.2, 1.2))
-      }
-
-      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
-        Seq("a DECIMAL(3, 2)", "b DECIMAL(18, 1)", "c DECIMAL(37, 1)").foreach { schema =>
-          val e = intercept[SparkException] {
-            readParquet(schema, path).collect()
-          }.getCause.getCause
-          assert(e.isInstanceOf[SchemaColumnConvertNotSupportedException])
-        }
-      }
-    }
-
-    // tests for parquet types without decimal metadata.
-    withTempPath { path =>
-      val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d")
-      df.write.parquet(path.toString)
-
-      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
-        checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00"))
-        checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null))
-        checkAnswer(readParquet("b DECIMAL(11, 1)", path), sql("SELECT 123456.0"))
-        checkAnswer(readParquet("c DECIMAL(11, 1)", path), Row(null))
-        checkAnswer(readParquet("c DECIMAL(13, 0)", path), df.select("c"))
-        val e = intercept[SparkException] {
-          readParquet("d DECIMAL(3, 2)", path).collect()
-        }.getCause
-        assert(e.getMessage.contains("Please read this column/field as Spark BINARY type"))
-      }
-
-      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
-        Seq("a DECIMAL(3, 2)", "c DECIMAL(18, 1)", "d DECIMAL(37, 1)").foreach { schema =>
-          val e = intercept[SparkException] {
-            readParquet(schema, path).collect()
-          }.getCause.getCause
-          assert(e.isInstanceOf[SchemaColumnConvertNotSupportedException])
-        }
-      }
-    }
-  }
 }
 
 case class Foo(bar: Option[String])
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 7f8357c..12c8e63 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
 import org.apache.spark.sql.execution.FileSourceScanExec
-import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
+import org.apache.spark.sql.execution.datasources.{SchemaColumnConvertNotSupportedException, SQLHadoopMapReduceCommitProtocol}
 import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT, SingleElement}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
@@ -906,6 +906,68 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
     }
   }
 
+  test("SPARK-34212 Parquet should read decimals correctly") {
+    def readParquet(schema: String, path: File): DataFrame = {
+      spark.read.schema(schema).parquet(path.toString)
+    }
+
+    withTempPath { path =>
+      // a is int-decimal (4 bytes), b is long-decimal (8 bytes), c is binary-decimal (16 bytes)
+      val df = sql("SELECT 1.0 a, CAST(1.23 AS DECIMAL(17, 2)) b, CAST(1.23 AS DECIMAL(36, 2)) c")
+      df.write.parquet(path.toString)
+
+      Seq(true, false).foreach { vectorizedReader =>
+        withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorizedReader.toString) {
+          // We can read the decimal parquet field with a larger precision, if scale is the same.
+          val schema = "a DECIMAL(9, 1), b DECIMAL(18, 2), c DECIMAL(38, 2)"
+          checkAnswer(readParquet(schema, path), df)
+        }
+      }
+
+      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
+        val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)"
+        checkAnswer(readParquet(schema1, path), df)
+        val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)"
+        checkAnswer(readParquet(schema2, path), Row(1, 1.2, 1.2))
+      }
+
+      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
+        Seq("a DECIMAL(3, 2)", "b DECIMAL(18, 1)", "c DECIMAL(37, 1)").foreach { schema =>
+          val e = intercept[SparkException] {
+            readParquet(schema, path).collect()
+          }.getCause.getCause
+          assert(e.isInstanceOf[SchemaColumnConvertNotSupportedException])
+        }
+      }
+    }
+
+    // tests for parquet types without decimal metadata.
+    withTempPath { path =>
+      val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d")
+      df.write.parquet(path.toString)
+
+      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
+        checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00"))
+        checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null))
+        checkAnswer(readParquet("b DECIMAL(11, 1)", path), sql("SELECT 123456.0"))
+        checkAnswer(readParquet("c DECIMAL(11, 1)", path), Row(null))
+        checkAnswer(readParquet("c DECIMAL(13, 0)", path), df.select("c"))
+        val e = intercept[SparkException] {
+          readParquet("d DECIMAL(3, 2)", path).collect()
+        }.getCause
+        assert(e.getMessage.contains("Please read this column/field as Spark BINARY type"))
+      }
+
+      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
+        Seq("a DECIMAL(3, 2)", "c DECIMAL(18, 1)", "d DECIMAL(37, 1)").foreach { schema =>
+          val e = intercept[SparkException] {
+            readParquet(schema, path).collect()
+          }.getCause.getCause
+          assert(e.isInstanceOf[SchemaColumnConvertNotSupportedException])
+        }
+      }
+    }
+  }
 }
 
 object TestingUDT {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org