You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/12/03 11:53:55 UTC
spark git commit: [SPARK-26230][SQL] FileIndex: if case sensitive,
validate partitions with original column names
Repository: spark
Updated Branches:
refs/heads/master 11e5f1bcd -> b569ba53f
[SPARK-26230][SQL] FileIndex: if case sensitive, validate partitions with original column names
## What changes were proposed in this pull request?
Partition column name is required to be unique under the same directory. The following paths are invalid partitioned directory:
```
hdfs://host:9000/path/a=1
hdfs://host:9000/path/b=2
```
If case sensitive, the following paths should be invalid too:
```
hdfs://host:9000/path/a=1
hdfs://host:9000/path/A=2
```
Since column 'a' and 'A' are different, and it is wrong to use either one as the column name in partition schema.
Also, there is a `TODO` comment in the code. Currently the Spark doesn't validate such case when `CASE_SENSITIVE` enabled.
This PR is to resolve the problem.
## How was this patch tested?
Add unit test
Closes #23186 from gengliangwang/SPARK-26230.
Authored-by: Gengliang Wang <ge...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b569ba53
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b569ba53
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b569ba53
Branch: refs/heads/master
Commit: b569ba53f4b650c03bd11def7c7f7589ceff61eb
Parents: 11e5f1b
Author: Gengliang Wang <ge...@databricks.com>
Authored: Mon Dec 3 19:53:45 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Mon Dec 3 19:53:45 2018 +0800
----------------------------------------------------------------------
.../datasources/PartitioningUtils.scala | 14 ++++++---
.../execution/datasources/FileIndexSuite.scala | 32 +++++++++++++++++++-
2 files changed, 40 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/b569ba53/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 9d2c9ba..d66cb09 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -155,7 +155,8 @@ object PartitioningUtils {
"root directory of the table. If there are multiple root directories, " +
"please load them separately and then union them.")
- val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues, timeZone)
+ val resolvedPartitionValues =
+ resolvePartitions(pathsWithPartitionValues, caseSensitive, timeZone)
// Creates the StructType which represents the partition columns.
val fields = {
@@ -345,15 +346,18 @@ object PartitioningUtils {
*/
def resolvePartitions(
pathsWithPartitionValues: Seq[(Path, PartitionValues)],
+ caseSensitive: Boolean,
timeZone: TimeZone): Seq[PartitionValues] = {
if (pathsWithPartitionValues.isEmpty) {
Seq.empty
} else {
- // TODO: Selective case sensitivity.
- val distinctPartColNames =
- pathsWithPartitionValues.map(_._2.columnNames.map(_.toLowerCase())).distinct
+ val partColNames = if (caseSensitive) {
+ pathsWithPartitionValues.map(_._2.columnNames)
+ } else {
+ pathsWithPartitionValues.map(_._2.columnNames.map(_.toLowerCase()))
+ }
assert(
- distinctPartColNames.size == 1,
+ partColNames.distinct.size == 1,
listConflictingPartitionColumns(pathsWithPartitionValues))
// Resolves possible type conflicts for each column
http://git-wip-us.apache.org/repos/asf/spark/blob/b569ba53/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index fdb0511..ec552f7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -52,7 +52,7 @@ class FileIndexSuite extends SharedSQLContext {
test("SPARK-26188: don't infer data types of partition columns if user specifies schema") {
withTempDir { dir =>
- val partitionDirectory = new File(dir, s"a=4d")
+ val partitionDirectory = new File(dir, "a=4d")
partitionDirectory.mkdir()
val file = new File(partitionDirectory, "text.txt")
stringToFile(file, "text")
@@ -65,6 +65,36 @@ class FileIndexSuite extends SharedSQLContext {
}
}
+ test("SPARK-26230: if case sensitive, validate partitions with original column names") {
+ withTempDir { dir =>
+ val partitionDirectory = new File(dir, "a=1")
+ partitionDirectory.mkdir()
+ val file = new File(partitionDirectory, "text.txt")
+ stringToFile(file, "text")
+ val partitionDirectory2 = new File(dir, "A=2")
+ partitionDirectory2.mkdir()
+ val file2 = new File(partitionDirectory2, "text.txt")
+ stringToFile(file2, "text")
+ val path = new Path(dir.getCanonicalPath)
+
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+ val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, None)
+ val partitionValues = fileIndex.partitionSpec().partitions.map(_.values)
+ assert(partitionValues.length == 2)
+ }
+
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+ val msg = intercept[AssertionError] {
+ val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, None)
+ fileIndex.partitionSpec()
+ }.getMessage
+ assert(msg.contains("Conflicting partition column names detected"))
+ assert("Partition column name list #[0-1]: A".r.findFirstIn(msg).isDefined)
+ assert("Partition column name list #[0-1]: a".r.findFirstIn(msg).isDefined)
+ }
+ }
+ }
+
test("InMemoryFileIndex: input paths are converted to qualified paths") {
withTempDir { dir =>
val file = new File(dir, "text.txt")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org