You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/04/13 09:21:46 UTC

[spark] branch branch-3.1 updated: [SPARK-34212][SQL][FOLLOWUP] Move the added test to ParquetQuerySuite

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 4abe6d6  [SPARK-34212][SQL][FOLLOWUP] Move the added test to ParquetQuerySuite
4abe6d6 is described below

commit 4abe6d68e0097345aa0e91e304ae18ef90ac14b8
Author: Yuming Wang <yu...@ebay.com>
AuthorDate: Tue Apr 13 09:04:47 2021 +0000

    [SPARK-34212][SQL][FOLLOWUP] Move the added test to ParquetQuerySuite
    
    This pr moves the added test from `SQLQuerySuite` to `ParquetQuerySuite`.
    
    1. It can be tested by `ParquetV1QuerySuite` and `ParquetV2QuerySuite`.
    2. Reduce the testing time of `SQLQuerySuite`(SQLQuerySuite ~ 3 min 17 sec, ParquetV1QuerySuite ~ 27 sec).
    
    No.
    
    Unit test.
    
    Closes #32090 from wangyum/SPARK-34212.
    
    Authored-by: Yuming Wang <yu...@ebay.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 64 ----------------------
 .../datasources/parquet/ParquetQuerySuite.scala    | 63 ++++++++++++++++++++-
 2 files changed, 62 insertions(+), 65 deletions(-)

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index aa673dc..f88eee7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -33,7 +33,6 @@ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec}
 import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
 import org.apache.spark.sql.execution.command.FunctionsCommand
-import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
 import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
@@ -3787,69 +3786,6 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
     }
   }
 
-  test("SPARK-34212 Parquet should read decimals correctly") {
-    def readParquet(schema: String, path: File): DataFrame = {
-      spark.read.schema(schema).parquet(path.toString)
-    }
-
-    withTempPath { path =>
-      // a is int-decimal (4 bytes), b is long-decimal (8 bytes), c is binary-decimal (16 bytes)
-      val df = sql("SELECT 1.0 a, CAST(1.23 AS DECIMAL(17, 2)) b, CAST(1.23 AS DECIMAL(36, 2)) c")
-      df.write.parquet(path.toString)
-
-      Seq(true, false).foreach { vectorizedReader =>
-        withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorizedReader.toString) {
-          // We can read the decimal parquet field with a larger precision, if scale is the same.
-          val schema = "a DECIMAL(9, 1), b DECIMAL(18, 2), c DECIMAL(38, 2)"
-          checkAnswer(readParquet(schema, path), df)
-        }
-      }
-
-      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
-        val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)"
-        checkAnswer(readParquet(schema1, path), df)
-        val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)"
-        checkAnswer(readParquet(schema2, path), Row(1, 1.2, 1.2))
-      }
-
-      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
-        Seq("a DECIMAL(3, 2)", "b DECIMAL(18, 1)", "c DECIMAL(37, 1)").foreach { schema =>
-          val e = intercept[SparkException] {
-            readParquet(schema, path).collect()
-          }.getCause.getCause
-          assert(e.isInstanceOf[SchemaColumnConvertNotSupportedException])
-        }
-      }
-    }
-
-    // tests for parquet types without decimal metadata.
-    withTempPath { path =>
-      val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d")
-      df.write.parquet(path.toString)
-
-      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
-        checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00"))
-        checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null))
-        checkAnswer(readParquet("b DECIMAL(11, 1)", path), sql("SELECT 123456.0"))
-        checkAnswer(readParquet("c DECIMAL(11, 1)", path), Row(null))
-        checkAnswer(readParquet("c DECIMAL(13, 0)", path), df.select("c"))
-        val e = intercept[SparkException] {
-          readParquet("d DECIMAL(3, 2)", path).collect()
-        }.getCause
-        assert(e.getMessage.contains("Please read this column/field as Spark BINARY type"))
-      }
-
-      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
-        Seq("a DECIMAL(3, 2)", "c DECIMAL(18, 1)", "d DECIMAL(37, 1)").foreach { schema =>
-          val e = intercept[SparkException] {
-            readParquet(schema, path).collect()
-          }.getCause.getCause
-          assert(e.isInstanceOf[SchemaColumnConvertNotSupportedException])
-        }
-      }
-    }
-  }
-
   test("SPARK-34421: Resolve temporary objects in temporary views with CTEs") {
     val tempFuncName = "temp_func"
     withUserDefinedFunction(tempFuncName -> true) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 8f85fe3..9ef4399 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
 import org.apache.spark.sql.execution.FileSourceScanExec
-import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
+import org.apache.spark.sql.execution.datasources.{SchemaColumnConvertNotSupportedException, SQLHadoopMapReduceCommitProtocol}
 import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT, SingleElement}
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
@@ -840,6 +840,67 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
     testMigration(fromTsType = "INT96", toTsType = "TIMESTAMP_MICROS")
     testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96")
   }
+
+  test("SPARK-34212 Parquet should read decimals correctly") {
+    def readParquet(schema: String, path: File): DataFrame = {
+      spark.read.schema(schema).parquet(path.toString)
+    }
+
+    withTempPath { path =>
+      // a is int-decimal (4 bytes), b is long-decimal (8 bytes), c is binary-decimal (16 bytes)
+      val df = sql("SELECT 1.0 a, CAST(1.23 AS DECIMAL(17, 2)) b, CAST(1.23 AS DECIMAL(36, 2)) c")
+      df.write.parquet(path.toString)
+
+      withAllParquetReaders {
+        // We can read the decimal parquet field with a larger precision, if scale is the same.
+        val schema = "a DECIMAL(9, 1), b DECIMAL(18, 2), c DECIMAL(38, 2)"
+        checkAnswer(readParquet(schema, path), df)
+      }
+
+      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
+        val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)"
+        checkAnswer(readParquet(schema1, path), df)
+        val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)"
+        checkAnswer(readParquet(schema2, path), Row(1, 1.2, 1.2))
+      }
+
+      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
+        Seq("a DECIMAL(3, 2)", "b DECIMAL(18, 1)", "c DECIMAL(37, 1)").foreach { schema =>
+          val e = intercept[SparkException] {
+            readParquet(schema, path).collect()
+          }.getCause.getCause
+          assert(e.isInstanceOf[SchemaColumnConvertNotSupportedException])
+        }
+      }
+    }
+
+    // tests for parquet types without decimal metadata.
+    withTempPath { path =>
+      val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d")
+      df.write.parquet(path.toString)
+
+      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
+        checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00"))
+        checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null))
+        checkAnswer(readParquet("b DECIMAL(11, 1)", path), sql("SELECT 123456.0"))
+        checkAnswer(readParquet("c DECIMAL(11, 1)", path), Row(null))
+        checkAnswer(readParquet("c DECIMAL(13, 0)", path), df.select("c"))
+        val e = intercept[SparkException] {
+          readParquet("d DECIMAL(3, 2)", path).collect()
+        }.getCause
+        assert(e.getMessage.contains("Please read this column/field as Spark BINARY type"))
+      }
+
+      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
+        Seq("a DECIMAL(3, 2)", "c DECIMAL(18, 1)", "d DECIMAL(37, 1)").foreach { schema =>
+          val e = intercept[SparkException] {
+            readParquet(schema, path).collect()
+          }.getCause.getCause
+          assert(e.isInstanceOf[SchemaColumnConvertNotSupportedException])
+        }
+      }
+    }
+  }
 }
 
 class ParquetV1QuerySuite extends ParquetQuerySuite {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org