You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2016/09/06 11:36:20 UTC

spark git commit: [SPARK-11301][SQL] Fix case sensitivity for filter on partitioned col…

Repository: spark
Updated Branches:
  refs/heads/branch-1.6 21be94b16 -> 958039a14


[SPARK-11301][SQL] Fix case sensitivity for filter on partitioned col\u2026

## What changes were proposed in this pull request?

`DataSourceStrategy` does not consider `SQLConf` in `Context` and always match column names. For instance, `HiveContext` uses case insensitive configuration, but it's ignored in `DataSourceStrategy`. This issue was originally registered at SPARK-11301 against 1.6.0 and seemed to be fixed at that time, but Apache Spark 1.6.2 still handles **partitioned column name** in a case-sensitive way always. This is incorrect like the following.

```scala
scala> sql("CREATE TABLE t(a int) PARTITIONED BY (b string) STORED AS PARQUET")
scala> sql("INSERT INTO TABLE t PARTITION(b='P') SELECT * FROM (SELECT 1) t")
scala> sql("INSERT INTO TABLE t PARTITION(b='Q') SELECT * FROM (SELECT 2) t")
scala> sql("SELECT * FROM T WHERE B='P'").show
+---+---+
|  a|  b|
+---+---+
|  1|  P|
|  2|  Q|
+---+---+
```

The result is the same with `set spark.sql.caseSensitive=false`. Here is the result in [Databricks CE](https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/6660119172909095/3421754458488607/5162191866050912/latest.html) .

This PR reads the configuration and handle the column name comparison accordingly.

## How was this patch tested?

Pass the Jenkins test with a modified test.

Author: Dongjoon Hyun <do...@apache.org>

Closes #14970 from dongjoon-hyun/SPARK-11301.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/958039a1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/958039a1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/958039a1

Branch: refs/heads/branch-1.6
Commit: 958039a14e93bb4bab6074ab11d3b168fd2e023e
Parents: 21be94b
Author: Dongjoon Hyun <do...@apache.org>
Authored: Tue Sep 6 19:36:12 2016 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Sep 6 19:36:12 2016 +0800

----------------------------------------------------------------------
 .../datasources/DataSourceStrategy.scala         | 19 ++++++++++++++++---
 .../org/apache/spark/sql/DataFrameSuite.scala    |  3 ++-
 2 files changed, 18 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/958039a1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 3741a9c..aa502c6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -255,9 +255,18 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
       predicates: Seq[Expression],
       partitionSpec: PartitionSpec): Seq[Partition] = {
     val PartitionSpec(partitionColumns, partitions) = partitionSpec
-    val partitionColumnNames = partitionColumns.map(_.name).toSet
+    val isCaseSensitive = SQLContext.getActive().get.conf.caseSensitiveAnalysis
+    val partitionColumnNames = if (isCaseSensitive) {
+      partitionColumns.map(_.name).toSet
+    } else {
+      partitionColumns.map(_.name.toLowerCase).toSet
+    }
     val partitionPruningPredicates = predicates.filter {
-      _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
+      if (isCaseSensitive) {
+        _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
+      } else {
+        _.references.map(_.name.toLowerCase).toSet.subsetOf(partitionColumnNames)
+      }
     }
 
     if (partitionPruningPredicates.nonEmpty) {
@@ -268,7 +277,11 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
 
       val boundPredicate = InterpretedPredicate.create(predicate.transform {
         case a: AttributeReference =>
-          val index = partitionColumns.indexWhere(a.name == _.name)
+          val index = if (isCaseSensitive) {
+            partitionColumns.indexWhere(a.name == _.name)
+          } else {
+            partitionColumns.indexWhere(c => a.name.equalsIgnoreCase(c.name))
+          }
           BoundReference(index, partitionColumns(index).dataType, nullable = true)
       })
 

http://git-wip-us.apache.org/repos/asf/spark/blob/958039a1/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 06436a1..542e4c2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -1013,7 +1013,8 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
   test("SPARK-11301: fix case sensitivity for filter on partitioned columns") {
     withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
       withTempPath { path =>
-        Seq(2012 -> "a").toDF("year", "val").write.partitionBy("year").parquet(path.getAbsolutePath)
+        Seq(2012 -> "a", 1999 -> "b").toDF("year", "val").write.partitionBy("year")
+          .parquet(path.getAbsolutePath)
         val df = sqlContext.read.parquet(path.getAbsolutePath)
         checkAnswer(df.filter($"yEAr" > 2000).select($"val"), Row("a"))
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org