You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/12/03 11:53:55 UTC

spark git commit: [SPARK-26230][SQL] FileIndex: if case sensitive, validate partitions with original column names

Repository: spark
Updated Branches:
  refs/heads/master 11e5f1bcd -> b569ba53f


[SPARK-26230][SQL] FileIndex: if case sensitive, validate partitions with original column names

## What changes were proposed in this pull request?

Partition column name is required to be unique under the same directory. The following paths are invalid partitioned directory:
```
hdfs://host:9000/path/a=1
hdfs://host:9000/path/b=2
```

If case sensitive, the following paths should be invalid too:
```
hdfs://host:9000/path/a=1
hdfs://host:9000/path/A=2
```
Since column 'a' and 'A' are different, and it is wrong to use either one as the column name in partition schema.

Also, there is a `TODO` comment in the code. Currently the Spark doesn't validate such case when `CASE_SENSITIVE` enabled.

This PR is to resolve the problem.

## How was this patch tested?

Add unit test

Closes #23186 from gengliangwang/SPARK-26230.

Authored-by: Gengliang Wang <ge...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b569ba53
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b569ba53
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b569ba53

Branch: refs/heads/master
Commit: b569ba53f4b650c03bd11def7c7f7589ceff61eb
Parents: 11e5f1b
Author: Gengliang Wang <ge...@databricks.com>
Authored: Mon Dec 3 19:53:45 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Mon Dec 3 19:53:45 2018 +0800

----------------------------------------------------------------------
 .../datasources/PartitioningUtils.scala         | 14 ++++++---
 .../execution/datasources/FileIndexSuite.scala  | 32 +++++++++++++++++++-
 2 files changed, 40 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b569ba53/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 9d2c9ba..d66cb09 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -155,7 +155,8 @@ object PartitioningUtils {
           "root directory of the table. If there are multiple root directories, " +
           "please load them separately and then union them.")
 
-      val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues, timeZone)
+      val resolvedPartitionValues =
+        resolvePartitions(pathsWithPartitionValues, caseSensitive, timeZone)
 
       // Creates the StructType which represents the partition columns.
       val fields = {
@@ -345,15 +346,18 @@ object PartitioningUtils {
    */
   def resolvePartitions(
       pathsWithPartitionValues: Seq[(Path, PartitionValues)],
+      caseSensitive: Boolean,
       timeZone: TimeZone): Seq[PartitionValues] = {
     if (pathsWithPartitionValues.isEmpty) {
       Seq.empty
     } else {
-      // TODO: Selective case sensitivity.
-      val distinctPartColNames =
-        pathsWithPartitionValues.map(_._2.columnNames.map(_.toLowerCase())).distinct
+      val partColNames = if (caseSensitive) {
+        pathsWithPartitionValues.map(_._2.columnNames)
+      } else {
+        pathsWithPartitionValues.map(_._2.columnNames.map(_.toLowerCase()))
+      }
       assert(
-        distinctPartColNames.size == 1,
+        partColNames.distinct.size == 1,
         listConflictingPartitionColumns(pathsWithPartitionValues))
 
       // Resolves possible type conflicts for each column

http://git-wip-us.apache.org/repos/asf/spark/blob/b569ba53/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index fdb0511..ec552f7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -52,7 +52,7 @@ class FileIndexSuite extends SharedSQLContext {
 
   test("SPARK-26188: don't infer data types of partition columns if user specifies schema") {
     withTempDir { dir =>
-      val partitionDirectory = new File(dir, s"a=4d")
+      val partitionDirectory = new File(dir, "a=4d")
       partitionDirectory.mkdir()
       val file = new File(partitionDirectory, "text.txt")
       stringToFile(file, "text")
@@ -65,6 +65,36 @@ class FileIndexSuite extends SharedSQLContext {
     }
   }
 
+  test("SPARK-26230: if case sensitive, validate partitions with original column names") {
+    withTempDir { dir =>
+      val partitionDirectory = new File(dir, "a=1")
+      partitionDirectory.mkdir()
+      val file = new File(partitionDirectory, "text.txt")
+      stringToFile(file, "text")
+      val partitionDirectory2 = new File(dir, "A=2")
+      partitionDirectory2.mkdir()
+      val file2 = new File(partitionDirectory2, "text.txt")
+      stringToFile(file2, "text")
+      val path = new Path(dir.getCanonicalPath)
+
+      withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+        val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, None)
+        val partitionValues = fileIndex.partitionSpec().partitions.map(_.values)
+        assert(partitionValues.length == 2)
+      }
+
+      withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+        val msg = intercept[AssertionError] {
+          val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, None)
+          fileIndex.partitionSpec()
+        }.getMessage
+        assert(msg.contains("Conflicting partition column names detected"))
+        assert("Partition column name list #[0-1]: A".r.findFirstIn(msg).isDefined)
+        assert("Partition column name list #[0-1]: a".r.findFirstIn(msg).isDefined)
+      }
+    }
+  }
+
   test("InMemoryFileIndex: input paths are converted to qualified paths") {
     withTempDir { dir =>
       val file = new File(dir, "text.txt")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org