You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/02/27 06:39:18 UTC
[spark] branch master updated: [SPARK-26990][SQL] FileIndex: use
user specified field names if possible
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 95e5572 [SPARK-26990][SQL] FileIndex: use user specified field names if possible
95e5572 is described below
commit 95e55720d4ba12e870fc6edaa9d264698fd23560
Author: Gengliang Wang <ge...@databricks.com>
AuthorDate: Wed Feb 27 14:38:35 2019 +0800
[SPARK-26990][SQL] FileIndex: use user specified field names if possible
## What changes were proposed in this pull request?
WIth the following file structure:
```
/tmp/data
└── a=5
```
In the previous release:
```
scala> spark.read.schema("A int, ID long").parquet("/tmp/data/").printSchema
root
|-- ID: long (nullable = true)
|-- A: integer (nullable = true)
```
While in current code:
```
scala> spark.read.schema("A int, ID long").parquet("/tmp/data/").printSchema
root
|-- ID: long (nullable = true)
|-- a: integer (nullable = true)
```
We can see that the partition column name `a` is different from `A` as user specifed. This PR is to fix the case and make it more user-friendly.
## How was this patch tested?
Unit test
Closes #23894 from gengliangwang/fileIndexSchema.
Authored-by: Gengliang Wang <ge...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../sql/execution/datasources/PartitioningUtils.scala | 11 ++++++++++-
.../spark/sql/execution/datasources/FileIndexSuite.scala | 15 +++++++++++++++
2 files changed, 25 insertions(+), 1 deletion(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index a2e0818..0625cfb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -122,6 +122,13 @@ object PartitioningUtils {
Map.empty[String, DataType]
}
+ // SPARK-26990: use user specified field names if case insensitive.
+ val userSpecifiedNames = if (userSpecifiedSchema.isDefined && !caseSensitive) {
+ CaseInsensitiveMap(userSpecifiedSchema.get.fields.map(f => f.name -> f.name).toMap)
+ } else {
+ Map.empty[String, String]
+ }
+
val dateFormatter = DateFormatter()
val timestampFormatter = TimestampFormatter(timestampPartitionPattern, timeZone)
// First, we need to parse every partition's path and see if we can find partition values.
@@ -170,7 +177,9 @@ object PartitioningUtils {
columnNames.zip(literals).map { case (name, Literal(_, dataType)) =>
// We always assume partition columns are nullable since we've no idea whether null values
// will be appended in the future.
- StructField(name, userSpecifiedDataTypes.getOrElse(name, dataType), nullable = true)
+ val resultName = userSpecifiedNames.getOrElse(name, name)
+ val resultDataType = userSpecifiedDataTypes.getOrElse(name, dataType)
+ StructField(resultName, resultDataType, nullable = true)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index 6bd0a25..e0a3641 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -65,6 +65,21 @@ class FileIndexSuite extends SharedSQLContext {
}
}
+ test("SPARK-26990: use user specified field names if possible") {
+ withTempDir { dir =>
+ val partitionDirectory = new File(dir, "a=foo")
+ partitionDirectory.mkdir()
+ val file = new File(partitionDirectory, "text.txt")
+ stringToFile(file, "text")
+ val path = new Path(dir.getCanonicalPath)
+ val schema = StructType(Seq(StructField("A", StringType, false)))
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+ val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, Some(schema))
+ assert(fileIndex.partitionSchema.length == 1 && fileIndex.partitionSchema.head.name == "A")
+ }
+ }
+ }
+
test("SPARK-26230: if case sensitive, validate partitions with original column names") {
withTempDir { dir =>
val partitionDirectory = new File(dir, "a=1")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org