You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/01/27 17:36:06 UTC

[spark] branch branch-3.1 updated: [SPARK-34212][SQL][FOLLOWUP] Refine the behavior of reading parquet non-decimal fields as decimal

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 5a2eb64  [SPARK-34212][SQL][FOLLOWUP] Refine the behavior of reading parquet non-decimal fields as decimal
5a2eb64 is described below

commit 5a2eb64a0d30051e6ce19ba62a89192099ca9b67
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Wed Jan 27 09:34:31 2021 -0800

    [SPARK-34212][SQL][FOLLOWUP] Refine the behavior of reading parquet non-decimal fields as decimal
    
    ### What changes were proposed in this pull request?
    
    This is a followup of https://github.com/apache/spark/pull/31319 .
    
    When reading parquet int/long as decimal, the behavior should be the same as reading int/long and then cast to the decimal type. This PR changes to the expected behavior.
    
    When reading parquet binary as decimal, we don't really know how to interpret the binary (it may from a string), and should fail. This PR changes to the expected behavior.
    
    ### Why are the changes needed?
    
    To make the behavior more sane.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, but it's a followup.
    
    ### How was this patch tested?
    
    updated test
    
    Closes #31357 from cloud-fan/bug.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
    (cherry picked from commit 2dbb7d5af8f498e49488cd8876bd3d0b083723b7)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../datasources/parquet/ParquetRowConverter.scala  | 48 +++++++++++-------
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 58 ++++++++++++----------
 2 files changed, 60 insertions(+), 46 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 32db964..dca12ff 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -243,19 +243,6 @@ private[parquet] class ParquetRowConverter(
   }
 
   /**
-   * Get a precision and a scale to interpret parquet decimal values.
-   * 1. If there is a decimal metadata, we read decimal values with the given precision and scale.
-   * 2. If there is no metadata, we read decimal values with scale `0` because it's plain integers
-   *    when it is written into INT32/INT64/BINARY/FIXED_LEN_BYTE_ARRAY types.
-   */
-  private def getPrecisionAndScale(parquetType: Type, t: DecimalType): (Int, Int) = {
-    val metadata = parquetType.asPrimitiveType().getDecimalMetadata
-    val precision = if (metadata == null) t.precision else metadata.getPrecision()
-    val scale = if (metadata == null) 0 else metadata.getScale()
-    (precision, scale)
-  }
-
-  /**
    * Creates a converter for the given Parquet type `parquetType` and Spark SQL data type
    * `catalystType`. Converted values are handled by `updater`.
    */
@@ -282,20 +269,43 @@ private[parquet] class ParquetRowConverter(
 
       // For INT32 backed decimals
       case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT32 =>
-        val (precision, scale) = getPrecisionAndScale(parquetType, t)
-        new ParquetIntDictionaryAwareDecimalConverter(precision, scale, updater)
+        val metadata = parquetType.asPrimitiveType().getDecimalMetadata
+        if (metadata == null) {
+          // If the column is a plain INT32, we should pick the precision that can host the largest
+          // INT32 value.
+          new ParquetIntDictionaryAwareDecimalConverter(
+            DecimalType.IntDecimal.precision, 0, updater)
+        } else {
+          new ParquetIntDictionaryAwareDecimalConverter(
+            metadata.getPrecision, metadata.getScale, updater)
+        }
 
       // For INT64 backed decimals
       case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT64 =>
-        val (precision, scale) = getPrecisionAndScale(parquetType, t)
-        new ParquetLongDictionaryAwareDecimalConverter(precision, scale, updater)
+        val metadata = parquetType.asPrimitiveType().getDecimalMetadata
+        if (metadata == null) {
+          // If the column is a plain INT64, we should pick the precision that can host the largest
+          // INT64 value.
+          new ParquetLongDictionaryAwareDecimalConverter(
+            DecimalType.LongDecimal.precision, 0, updater)
+        } else {
+          new ParquetLongDictionaryAwareDecimalConverter(
+            metadata.getPrecision, metadata.getScale, updater)
+        }
 
       // For BINARY and FIXED_LEN_BYTE_ARRAY backed decimals
       case t: DecimalType
         if parquetType.asPrimitiveType().getPrimitiveTypeName == FIXED_LEN_BYTE_ARRAY ||
            parquetType.asPrimitiveType().getPrimitiveTypeName == BINARY =>
-        val (precision, scale) = getPrecisionAndScale(parquetType, t)
-        new ParquetBinaryDictionaryAwareDecimalConverter(precision, scale, updater)
+        val metadata = parquetType.asPrimitiveType().getDecimalMetadata
+        if (metadata == null) {
+          throw new RuntimeException(s"Unable to create Parquet converter for ${t.typeName} " +
+            s"whose Parquet type is $parquetType without decimal metadata. Please read this " +
+            "column/field as Spark BINARY type." )
+        } else {
+          new ParquetBinaryDictionaryAwareDecimalConverter(
+            metadata.getPrecision, metadata.getScale, updater)
+        }
 
       case t: DecimalType =>
         throw new RuntimeException(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 14f56a8..d2a578b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -3776,52 +3776,56 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
   }
 
   test("SPARK-34212 Parquet should read decimals correctly") {
-    // a is int-decimal (4 bytes), b is long-decimal (8 bytes), c is binary-decimal (16 bytes)
-    val df = sql("SELECT 1.0 a, CAST(1.23 AS DECIMAL(17, 2)) b, CAST(1.23 AS DECIMAL(36, 2)) c")
+    def readParquet(schema: String, path: File): DataFrame = {
+      spark.read.schema(schema).parquet(path.toString)
+    }
 
     withTempPath { path =>
+      // a is int-decimal (4 bytes), b is long-decimal (8 bytes), c is binary-decimal (16 bytes)
+      val df = sql("SELECT 1.0 a, CAST(1.23 AS DECIMAL(17, 2)) b, CAST(1.23 AS DECIMAL(36, 2)) c")
       df.write.parquet(path.toString)
 
       withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
         val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)"
-        checkAnswer(spark.read.schema(schema1).parquet(path.toString), df)
+        checkAnswer(readParquet(schema1, path), df)
         val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)"
-        checkAnswer(spark.read.schema(schema2).parquet(path.toString), Row(1, 1.2, 1.2))
+        checkAnswer(readParquet(schema2, path), Row(1, 1.2, 1.2))
       }
 
       withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
-        val e1 = intercept[SparkException] {
-          spark.read.schema("a DECIMAL(3, 2)").parquet(path.toString).collect()
-        }.getCause.getCause
-        assert(e1.isInstanceOf[SchemaColumnConvertNotSupportedException])
-
-        val e2 = intercept[SparkException] {
-          spark.read.schema("b DECIMAL(18, 1)").parquet(path.toString).collect()
-        }.getCause.getCause
-        assert(e2.isInstanceOf[SchemaColumnConvertNotSupportedException])
-
-        val e3 = intercept[SparkException] {
-          spark.read.schema("c DECIMAL(37, 1)").parquet(path.toString).collect()
-        }.getCause.getCause
-        assert(e3.isInstanceOf[SchemaColumnConvertNotSupportedException])
+        Seq("a DECIMAL(3, 2)", "b DECIMAL(18, 1)", "c DECIMAL(37, 1)").foreach { schema =>
+          val e = intercept[SparkException] {
+            readParquet(schema, path).collect()
+          }.getCause.getCause
+          assert(e.isInstanceOf[SchemaColumnConvertNotSupportedException])
+        }
       }
     }
 
+    // tests for parquet types without decimal metadata.
     withTempPath { path =>
-      val df2 = sql(s"SELECT 1 a, ${Int.MaxValue + 1L} b, CAST(2 AS BINARY) c")
-      df2.write.parquet(path.toString)
+      val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d")
+      df.write.parquet(path.toString)
 
       withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
-        val schema = "a DECIMAL(3, 2), b DECIMAL(17, 2), c DECIMAL(36, 2)"
-        checkAnswer(spark.read.schema(schema).parquet(path.toString),
-          Row(BigDecimal(100, 2), BigDecimal((Int.MaxValue + 1L) * 100, 2), BigDecimal(200, 2)))
+        checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00"))
+        checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null))
+        checkAnswer(readParquet("b DECIMAL(11, 1)", path), sql("SELECT 123456.0"))
+        checkAnswer(readParquet("c DECIMAL(11, 1)", path), Row(null))
+        checkAnswer(readParquet("c DECIMAL(13, 0)", path), df.select("c"))
+        val e = intercept[SparkException] {
+          readParquet("d DECIMAL(3, 2)", path).collect()
+        }.getCause
+        assert(e.getMessage.contains("Please read this column/field as Spark BINARY type"))
       }
 
       withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
-        val e = intercept[SparkException] {
-          spark.read.schema("a DECIMAL(3, 2)").parquet(path.toString).collect()
-        }.getCause.getCause
-        assert(e.isInstanceOf[SchemaColumnConvertNotSupportedException])
+        Seq("a DECIMAL(3, 2)", "c DECIMAL(18, 1)", "d DECIMAL(37, 1)").foreach { schema =>
+          val e = intercept[SparkException] {
+            readParquet(schema, path).collect()
+          }.getCause.getCause
+          assert(e.isInstanceOf[SchemaColumnConvertNotSupportedException])
+        }
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org