You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/04/13 09:42:11 UTC
[spark] branch branch-2.4 updated: [SPARK-34212][SQL][FOLLOWUP]
Move the added test to ParquetQuerySuite
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 63ebabb [SPARK-34212][SQL][FOLLOWUP] Move the added test to ParquetQuerySuite
63ebabb is described below
commit 63ebabba57de17479310cc5161d3f7bcabaedbb6
Author: Yuming Wang <yu...@ebay.com>
AuthorDate: Tue Apr 13 09:04:47 2021 +0000
[SPARK-34212][SQL][FOLLOWUP] Move the added test to ParquetQuerySuite
This pr moves the added test from `SQLQuerySuite` to `ParquetQuerySuite`.
1. It can be tested by `ParquetV1QuerySuite` and `ParquetV2QuerySuite`.
2. Reduce the testing time of `SQLQuerySuite`(SQLQuerySuite ~ 3 min 17 sec, ParquetV1QuerySuite ~ 27 sec).
No.
Unit test.
Closes #32090 from wangyum/SPARK-34212.
Authored-by: Yuming Wang <yu...@ebay.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../scala/org/apache/spark/sql/SQLQuerySuite.scala | 64 ----------------------
.../datasources/parquet/ParquetQuerySuite.scala | 64 +++++++++++++++++++++-
2 files changed, 63 insertions(+), 65 deletions(-)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index f262eab..ab2a1c9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -29,7 +29,6 @@ import org.apache.spark.sql.execution.aggregate
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.datasources.FilePartition
-import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
@@ -3141,69 +3140,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
})
}
-
- test("SPARK-34212 Parquet should read decimals correctly") {
- def readParquet(schema: String, path: File): DataFrame = {
- spark.read.schema(schema).parquet(path.toString)
- }
-
- withTempPath { path =>
- // a is int-decimal (4 bytes), b is long-decimal (8 bytes), c is binary-decimal (16 bytes)
- val df = sql("SELECT 1.0 a, CAST(1.23 AS DECIMAL(17, 2)) b, CAST(1.23 AS DECIMAL(36, 2)) c")
- df.write.parquet(path.toString)
-
- Seq(true, false).foreach { vectorizedReader =>
- withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorizedReader.toString) {
- // We can read the decimal parquet field with a larger precision, if scale is the same.
- val schema = "a DECIMAL(9, 1), b DECIMAL(18, 2), c DECIMAL(38, 2)"
- checkAnswer(readParquet(schema, path), df)
- }
- }
-
- withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
- val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)"
- checkAnswer(readParquet(schema1, path), df)
- val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)"
- checkAnswer(readParquet(schema2, path), Row(1, 1.2, 1.2))
- }
-
- withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
- Seq("a DECIMAL(3, 2)", "b DECIMAL(18, 1)", "c DECIMAL(37, 1)").foreach { schema =>
- val e = intercept[SparkException] {
- readParquet(schema, path).collect()
- }.getCause.getCause
- assert(e.isInstanceOf[SchemaColumnConvertNotSupportedException])
- }
- }
- }
-
- // tests for parquet types without decimal metadata.
- withTempPath { path =>
- val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d")
- df.write.parquet(path.toString)
-
- withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
- checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00"))
- checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null))
- checkAnswer(readParquet("b DECIMAL(11, 1)", path), sql("SELECT 123456.0"))
- checkAnswer(readParquet("c DECIMAL(11, 1)", path), Row(null))
- checkAnswer(readParquet("c DECIMAL(13, 0)", path), df.select("c"))
- val e = intercept[SparkException] {
- readParquet("d DECIMAL(3, 2)", path).collect()
- }.getCause
- assert(e.getMessage.contains("Please read this column/field as Spark BINARY type"))
- }
-
- withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
- Seq("a DECIMAL(3, 2)", "c DECIMAL(18, 1)", "d DECIMAL(37, 1)").foreach { schema =>
- val e = intercept[SparkException] {
- readParquet(schema, path).collect()
- }.getCause.getCause
- assert(e.isInstanceOf[SchemaColumnConvertNotSupportedException])
- }
- }
- }
- }
}
case class Foo(bar: Option[String])
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 7f8357c..12c8e63 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
import org.apache.spark.sql.execution.FileSourceScanExec
-import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
+import org.apache.spark.sql.execution.datasources.{SchemaColumnConvertNotSupportedException, SQLHadoopMapReduceCommitProtocol}
import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT, SingleElement}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
@@ -906,6 +906,68 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
}
}
+ test("SPARK-34212 Parquet should read decimals correctly") {
+ def readParquet(schema: String, path: File): DataFrame = {
+ spark.read.schema(schema).parquet(path.toString)
+ }
+
+ withTempPath { path =>
+ // a is int-decimal (4 bytes), b is long-decimal (8 bytes), c is binary-decimal (16 bytes)
+ val df = sql("SELECT 1.0 a, CAST(1.23 AS DECIMAL(17, 2)) b, CAST(1.23 AS DECIMAL(36, 2)) c")
+ df.write.parquet(path.toString)
+
+ Seq(true, false).foreach { vectorizedReader =>
+ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorizedReader.toString) {
+ // We can read the decimal parquet field with a larger precision, if scale is the same.
+ val schema = "a DECIMAL(9, 1), b DECIMAL(18, 2), c DECIMAL(38, 2)"
+ checkAnswer(readParquet(schema, path), df)
+ }
+ }
+
+ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
+ val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)"
+ checkAnswer(readParquet(schema1, path), df)
+ val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)"
+ checkAnswer(readParquet(schema2, path), Row(1, 1.2, 1.2))
+ }
+
+ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
+ Seq("a DECIMAL(3, 2)", "b DECIMAL(18, 1)", "c DECIMAL(37, 1)").foreach { schema =>
+ val e = intercept[SparkException] {
+ readParquet(schema, path).collect()
+ }.getCause.getCause
+ assert(e.isInstanceOf[SchemaColumnConvertNotSupportedException])
+ }
+ }
+ }
+
+ // tests for parquet types without decimal metadata.
+ withTempPath { path =>
+ val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d")
+ df.write.parquet(path.toString)
+
+ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
+ checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00"))
+ checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null))
+ checkAnswer(readParquet("b DECIMAL(11, 1)", path), sql("SELECT 123456.0"))
+ checkAnswer(readParquet("c DECIMAL(11, 1)", path), Row(null))
+ checkAnswer(readParquet("c DECIMAL(13, 0)", path), df.select("c"))
+ val e = intercept[SparkException] {
+ readParquet("d DECIMAL(3, 2)", path).collect()
+ }.getCause
+ assert(e.getMessage.contains("Please read this column/field as Spark BINARY type"))
+ }
+
+ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
+ Seq("a DECIMAL(3, 2)", "c DECIMAL(18, 1)", "d DECIMAL(37, 1)").foreach { schema =>
+ val e = intercept[SparkException] {
+ readParquet(schema, path).collect()
+ }.getCause.getCause
+ assert(e.isInstanceOf[SchemaColumnConvertNotSupportedException])
+ }
+ }
+ }
+ }
}
object TestingUDT {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org