You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/03/16 17:34:43 UTC

[spark] branch branch-3.0 updated: [SPARK-31116][SQL] Fix nested schema case-sensitivity in ParquetRowConverter

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new da1f95b  [SPARK-31116][SQL] Fix nested schema case-sensitivity in ParquetRowConverter
da1f95b is described below

commit da1f95be6b9af59a91a14e01613bdc4e8ac35374
Author: Tae-kyeom, Kim <ki...@devsisters.com>
AuthorDate: Mon Mar 16 10:31:56 2020 -0700

    [SPARK-31116][SQL] Fix nested schema case-sensitivity in ParquetRowConverter
    
    ### What changes were proposed in this pull request?
    
    This PR (SPARK-31116) add caseSensitive parameter to ParquetRowConverter so that it handle materialize parquet properly with respect to case sensitivity
    
    ### Why are the changes needed?
    
    From spark 3.0.0, below statement throws IllegalArgumentException in caseInsensitive mode because of explicit field index searching in ParquetRowConverter. As we already constructed parquet requested schema and catalyst requested schema during schema clipping in ParquetReadSupport, just follow these behavior.
    
    ```scala
    val path = "/some/temp/path"
    
    spark
      .range(1L)
      .selectExpr("NAMED_STRUCT('lowercase', id, 'camelCase', id + 1) AS StructColumn")
      .write.parquet(path)
    
    val caseInsensitiveSchema = new StructType()
      .add(
        "StructColumn",
        new StructType()
          .add("LowerCase", LongType)
          .add("camelcase", LongType))
    
    spark.read.schema(caseInsensitiveSchema).parquet(path).show()
    ```
    
    ### Does this PR introduce any user-facing change?
    
    No. The changes are only in unreleased branches (`master` and `branch-3.0`).
    
    ### How was this patch tested?
    
    Passed new test cases that check parquet column selection with respect to schemas and case sensitivities
    
    Closes #27888 from kimtkyeom/parquet_row_converter_case_sensitivity.
    
    Authored-by: Tae-kyeom, Kim <ki...@devsisters.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
    (cherry picked from commit e736c62764137b2c3af90d2dc8a77e391891200a)
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../datasources/parquet/ParquetRowConverter.scala  | 12 +++++--
 .../spark/sql/FileBasedDataSourceSuite.scala       | 40 ++++++++++++++++++++++
 2 files changed, 50 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 850adae..22422c0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -33,8 +33,9 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, DOUBLE
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, CaseInsensitiveMap, DateTimeUtils, GenericArrayData}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -178,8 +179,15 @@ private[parquet] class ParquetRowConverter(
 
   // Converters for each field.
   private[this] val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
+    // (SPARK-31116) Use case insensitive map if spark.sql.caseSensitive is false
+    // to prevent throwing IllegalArgumentException when searching catalyst type's field index
+    val catalystFieldNameToIndex = if (SQLConf.get.caseSensitiveAnalysis) {
+      catalystType.fieldNames.zipWithIndex.toMap
+    } else {
+      CaseInsensitiveMap(catalystType.fieldNames.zipWithIndex.toMap)
+    }
     parquetType.getFields.asScala.map { parquetField =>
-      val fieldIndex = catalystType.fieldIndex(parquetField.getName)
+      val fieldIndex = catalystFieldNameToIndex(parquetField.getName)
       val catalystField = catalystType(fieldIndex)
       // Converted field value should be set to the `fieldIndex`-th cell of `currentRow`
       newConverter(parquetField, catalystField.dataType, new RowUpdater(currentRow, fieldIndex))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index c870958..cb410b4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -842,6 +842,46 @@ class FileBasedDataSourceSuite extends QueryTest
       }
     }
   }
+
+  test("SPARK-31116: Select nested schema with case insensitive mode") {
+    // This test case failed at only Parquet. ORC is added for test coverage parity.
+    Seq("orc", "parquet").foreach { format =>
+      Seq("true", "false").foreach { nestedSchemaPruningEnabled =>
+        withSQLConf(
+          SQLConf.CASE_SENSITIVE.key -> "false",
+          SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key -> nestedSchemaPruningEnabled) {
+          withTempPath { dir =>
+            val path = dir.getCanonicalPath
+
+            // Prepare values for testing nested parquet data
+            spark
+              .range(1L)
+              .selectExpr("NAMED_STRUCT('lowercase', id, 'camelCase', id + 1) AS StructColumn")
+              .write
+              .format(format)
+              .save(path)
+
+            val exactSchema = "StructColumn struct<lowercase: LONG, camelCase: LONG>"
+
+            checkAnswer(spark.read.schema(exactSchema).format(format).load(path), Row(Row(0, 1)))
+
+            // In case insensitive manner, parquet's column cases are ignored
+            val innerColumnCaseInsensitiveSchema =
+              "StructColumn struct<Lowercase: LONG, camelcase: LONG>"
+            checkAnswer(
+              spark.read.schema(innerColumnCaseInsensitiveSchema).format(format).load(path),
+              Row(Row(0, 1)))
+
+            val rootColumnCaseInsensitiveSchema =
+              "structColumn struct<lowercase: LONG, camelCase: LONG>"
+            checkAnswer(
+              spark.read.schema(rootColumnCaseInsensitiveSchema).format(format).load(path),
+              Row(Row(0, 1)))
+          }
+        }
+      }
+    }
+  }
 }
 
 object TestingUDT {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org