You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/04/13 09:21:46 UTC
[spark] branch branch-3.1 updated: [SPARK-34212][SQL][FOLLOWUP]
Move the added test to ParquetQuerySuite
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 4abe6d6 [SPARK-34212][SQL][FOLLOWUP] Move the added test to ParquetQuerySuite
4abe6d6 is described below
commit 4abe6d68e0097345aa0e91e304ae18ef90ac14b8
Author: Yuming Wang <yu...@ebay.com>
AuthorDate: Tue Apr 13 09:04:47 2021 +0000
[SPARK-34212][SQL][FOLLOWUP] Move the added test to ParquetQuerySuite
This pr moves the added test from `SQLQuerySuite` to `ParquetQuerySuite`.
1. It can be tested by `ParquetV1QuerySuite` and `ParquetV2QuerySuite`.
2. Reduce the testing time of `SQLQuerySuite`(SQLQuerySuite ~ 3 min 17 sec, ParquetV1QuerySuite ~ 27 sec).
No.
Unit test.
Closes #32090 from wangyum/SPARK-34212.
Authored-by: Yuming Wang <yu...@ebay.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../scala/org/apache/spark/sql/SQLQuerySuite.scala | 64 ----------------------
.../datasources/parquet/ParquetQuerySuite.scala | 63 ++++++++++++++++++++-
2 files changed, 62 insertions(+), 65 deletions(-)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index aa673dc..f88eee7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -33,7 +33,6 @@ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.command.FunctionsCommand
-import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
@@ -3787,69 +3786,6 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
}
}
- test("SPARK-34212 Parquet should read decimals correctly") {
- def readParquet(schema: String, path: File): DataFrame = {
- spark.read.schema(schema).parquet(path.toString)
- }
-
- withTempPath { path =>
- // a is int-decimal (4 bytes), b is long-decimal (8 bytes), c is binary-decimal (16 bytes)
- val df = sql("SELECT 1.0 a, CAST(1.23 AS DECIMAL(17, 2)) b, CAST(1.23 AS DECIMAL(36, 2)) c")
- df.write.parquet(path.toString)
-
- Seq(true, false).foreach { vectorizedReader =>
- withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorizedReader.toString) {
- // We can read the decimal parquet field with a larger precision, if scale is the same.
- val schema = "a DECIMAL(9, 1), b DECIMAL(18, 2), c DECIMAL(38, 2)"
- checkAnswer(readParquet(schema, path), df)
- }
- }
-
- withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
- val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)"
- checkAnswer(readParquet(schema1, path), df)
- val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)"
- checkAnswer(readParquet(schema2, path), Row(1, 1.2, 1.2))
- }
-
- withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
- Seq("a DECIMAL(3, 2)", "b DECIMAL(18, 1)", "c DECIMAL(37, 1)").foreach { schema =>
- val e = intercept[SparkException] {
- readParquet(schema, path).collect()
- }.getCause.getCause
- assert(e.isInstanceOf[SchemaColumnConvertNotSupportedException])
- }
- }
- }
-
- // tests for parquet types without decimal metadata.
- withTempPath { path =>
- val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d")
- df.write.parquet(path.toString)
-
- withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
- checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00"))
- checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null))
- checkAnswer(readParquet("b DECIMAL(11, 1)", path), sql("SELECT 123456.0"))
- checkAnswer(readParquet("c DECIMAL(11, 1)", path), Row(null))
- checkAnswer(readParquet("c DECIMAL(13, 0)", path), df.select("c"))
- val e = intercept[SparkException] {
- readParquet("d DECIMAL(3, 2)", path).collect()
- }.getCause
- assert(e.getMessage.contains("Please read this column/field as Spark BINARY type"))
- }
-
- withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
- Seq("a DECIMAL(3, 2)", "c DECIMAL(18, 1)", "d DECIMAL(37, 1)").foreach { schema =>
- val e = intercept[SparkException] {
- readParquet(schema, path).collect()
- }.getCause.getCause
- assert(e.isInstanceOf[SchemaColumnConvertNotSupportedException])
- }
- }
- }
- }
-
test("SPARK-34421: Resolve temporary objects in temporary views with CTEs") {
val tempFuncName = "temp_func"
withUserDefinedFunction(tempFuncName -> true) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 8f85fe3..9ef4399 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
import org.apache.spark.sql.execution.FileSourceScanExec
-import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
+import org.apache.spark.sql.execution.datasources.{SchemaColumnConvertNotSupportedException, SQLHadoopMapReduceCommitProtocol}
import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT, SingleElement}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
@@ -840,6 +840,67 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
testMigration(fromTsType = "INT96", toTsType = "TIMESTAMP_MICROS")
testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96")
}
+
+ test("SPARK-34212 Parquet should read decimals correctly") {
+ def readParquet(schema: String, path: File): DataFrame = {
+ spark.read.schema(schema).parquet(path.toString)
+ }
+
+ withTempPath { path =>
+ // a is int-decimal (4 bytes), b is long-decimal (8 bytes), c is binary-decimal (16 bytes)
+ val df = sql("SELECT 1.0 a, CAST(1.23 AS DECIMAL(17, 2)) b, CAST(1.23 AS DECIMAL(36, 2)) c")
+ df.write.parquet(path.toString)
+
+ withAllParquetReaders {
+ // We can read the decimal parquet field with a larger precision, if scale is the same.
+ val schema = "a DECIMAL(9, 1), b DECIMAL(18, 2), c DECIMAL(38, 2)"
+ checkAnswer(readParquet(schema, path), df)
+ }
+
+ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
+ val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)"
+ checkAnswer(readParquet(schema1, path), df)
+ val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)"
+ checkAnswer(readParquet(schema2, path), Row(1, 1.2, 1.2))
+ }
+
+ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
+ Seq("a DECIMAL(3, 2)", "b DECIMAL(18, 1)", "c DECIMAL(37, 1)").foreach { schema =>
+ val e = intercept[SparkException] {
+ readParquet(schema, path).collect()
+ }.getCause.getCause
+ assert(e.isInstanceOf[SchemaColumnConvertNotSupportedException])
+ }
+ }
+ }
+
+ // tests for parquet types without decimal metadata.
+ withTempPath { path =>
+ val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d")
+ df.write.parquet(path.toString)
+
+ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
+ checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00"))
+ checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null))
+ checkAnswer(readParquet("b DECIMAL(11, 1)", path), sql("SELECT 123456.0"))
+ checkAnswer(readParquet("c DECIMAL(11, 1)", path), Row(null))
+ checkAnswer(readParquet("c DECIMAL(13, 0)", path), df.select("c"))
+ val e = intercept[SparkException] {
+ readParquet("d DECIMAL(3, 2)", path).collect()
+ }.getCause
+ assert(e.getMessage.contains("Please read this column/field as Spark BINARY type"))
+ }
+
+ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
+ Seq("a DECIMAL(3, 2)", "c DECIMAL(18, 1)", "d DECIMAL(37, 1)").foreach { schema =>
+ val e = intercept[SparkException] {
+ readParquet(schema, path).collect()
+ }.getCause.getCause
+ assert(e.isInstanceOf[SchemaColumnConvertNotSupportedException])
+ }
+ }
+ }
+ }
}
class ParquetV1QuerySuite extends ParquetQuerySuite {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org