You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/02/27 06:39:18 UTC

[spark] branch master updated: [SPARK-26990][SQL] FileIndex: use user specified field names if possible

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 95e5572  [SPARK-26990][SQL] FileIndex: use user specified field names if possible
95e5572 is described below

commit 95e55720d4ba12e870fc6edaa9d264698fd23560
Author: Gengliang Wang <ge...@databricks.com>
AuthorDate: Wed Feb 27 14:38:35 2019 +0800

    [SPARK-26990][SQL] FileIndex: use user specified field names if possible
    
    ## What changes were proposed in this pull request?
    
    WIth the following file structure:
    ```
    /tmp/data
    └── a=5
    ```
    
    In the previous release:
    ```
    scala> spark.read.schema("A int, ID long").parquet("/tmp/data/").printSchema
    root
     |-- ID: long (nullable = true)
     |-- A: integer (nullable = true)
    ```
    
    While in current code:
    ```
    scala> spark.read.schema("A int, ID long").parquet("/tmp/data/").printSchema
    root
     |-- ID: long (nullable = true)
     |-- a: integer (nullable = true)
    ```
    
    We can see that the partition column name `a` is different from `A` as user specifed. This PR is to fix the case and make it more user-friendly.
    
    ## How was this patch tested?
    
    Unit test
    
    Closes #23894 from gengliangwang/fileIndexSchema.
    
    Authored-by: Gengliang Wang <ge...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../sql/execution/datasources/PartitioningUtils.scala     | 11 ++++++++++-
 .../spark/sql/execution/datasources/FileIndexSuite.scala  | 15 +++++++++++++++
 2 files changed, 25 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index a2e0818..0625cfb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -122,6 +122,13 @@ object PartitioningUtils {
       Map.empty[String, DataType]
     }
 
+    // SPARK-26990: use user specified field names if case insensitive.
+    val userSpecifiedNames = if (userSpecifiedSchema.isDefined && !caseSensitive) {
+      CaseInsensitiveMap(userSpecifiedSchema.get.fields.map(f => f.name -> f.name).toMap)
+    } else {
+      Map.empty[String, String]
+    }
+
     val dateFormatter = DateFormatter()
     val timestampFormatter = TimestampFormatter(timestampPartitionPattern, timeZone)
     // First, we need to parse every partition's path and see if we can find partition values.
@@ -170,7 +177,9 @@ object PartitioningUtils {
         columnNames.zip(literals).map { case (name, Literal(_, dataType)) =>
           // We always assume partition columns are nullable since we've no idea whether null values
           // will be appended in the future.
-          StructField(name, userSpecifiedDataTypes.getOrElse(name, dataType), nullable = true)
+          val resultName = userSpecifiedNames.getOrElse(name, name)
+          val resultDataType = userSpecifiedDataTypes.getOrElse(name, dataType)
+          StructField(resultName, resultDataType, nullable = true)
         }
       }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index 6bd0a25..e0a3641 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -65,6 +65,21 @@ class FileIndexSuite extends SharedSQLContext {
     }
   }
 
+  test("SPARK-26990: use user specified field names if possible") {
+    withTempDir { dir =>
+      val partitionDirectory = new File(dir, "a=foo")
+      partitionDirectory.mkdir()
+      val file = new File(partitionDirectory, "text.txt")
+      stringToFile(file, "text")
+      val path = new Path(dir.getCanonicalPath)
+      val schema = StructType(Seq(StructField("A", StringType, false)))
+      withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+        val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, Some(schema))
+        assert(fileIndex.partitionSchema.length == 1 && fileIndex.partitionSchema.head.name == "A")
+      }
+    }
+  }
+
   test("SPARK-26230: if case sensitive, validate partitions with original column names") {
     withTempDir { dir =>
       val partitionDirectory = new File(dir, "a=1")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org