You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2016/09/06 11:36:20 UTC
spark git commit: [SPARK-11301][SQL] Fix case sensitivity for filter on partitioned col…
Repository: spark
Updated Branches:
refs/heads/branch-1.6 21be94b16 -> 958039a14
[SPARK-11301][SQL] Fix case sensitivity for filter on partitioned col\u2026
## What changes were proposed in this pull request?
`DataSourceStrategy` does not consider `SQLConf` in `Context` and always match column names. For instance, `HiveContext` uses case insensitive configuration, but it's ignored in `DataSourceStrategy`. This issue was originally registered at SPARK-11301 against 1.6.0 and seemed to be fixed at that time, but Apache Spark 1.6.2 still handles **partitioned column name** in a case-sensitive way always. This is incorrect like the following.
```scala
scala> sql("CREATE TABLE t(a int) PARTITIONED BY (b string) STORED AS PARQUET")
scala> sql("INSERT INTO TABLE t PARTITION(b='P') SELECT * FROM (SELECT 1) t")
scala> sql("INSERT INTO TABLE t PARTITION(b='Q') SELECT * FROM (SELECT 2) t")
scala> sql("SELECT * FROM T WHERE B='P'").show
+---+---+
| a| b|
+---+---+
| 1| P|
| 2| Q|
+---+---+
```
The result is the same with `set spark.sql.caseSensitive=false`. Here is the result in [Databricks CE](https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/6660119172909095/3421754458488607/5162191866050912/latest.html) .
This PR reads the configuration and handle the column name comparison accordingly.
## How was this patch tested?
Pass the Jenkins test with a modified test.
Author: Dongjoon Hyun <do...@apache.org>
Closes #14970 from dongjoon-hyun/SPARK-11301.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/958039a1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/958039a1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/958039a1
Branch: refs/heads/branch-1.6
Commit: 958039a14e93bb4bab6074ab11d3b168fd2e023e
Parents: 21be94b
Author: Dongjoon Hyun <do...@apache.org>
Authored: Tue Sep 6 19:36:12 2016 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Sep 6 19:36:12 2016 +0800
----------------------------------------------------------------------
.../datasources/DataSourceStrategy.scala | 19 ++++++++++++++++---
.../org/apache/spark/sql/DataFrameSuite.scala | 3 ++-
2 files changed, 18 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/958039a1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 3741a9c..aa502c6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -255,9 +255,18 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
predicates: Seq[Expression],
partitionSpec: PartitionSpec): Seq[Partition] = {
val PartitionSpec(partitionColumns, partitions) = partitionSpec
- val partitionColumnNames = partitionColumns.map(_.name).toSet
+ val isCaseSensitive = SQLContext.getActive().get.conf.caseSensitiveAnalysis
+ val partitionColumnNames = if (isCaseSensitive) {
+ partitionColumns.map(_.name).toSet
+ } else {
+ partitionColumns.map(_.name.toLowerCase).toSet
+ }
val partitionPruningPredicates = predicates.filter {
- _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
+ if (isCaseSensitive) {
+ _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
+ } else {
+ _.references.map(_.name.toLowerCase).toSet.subsetOf(partitionColumnNames)
+ }
}
if (partitionPruningPredicates.nonEmpty) {
@@ -268,7 +277,11 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
val boundPredicate = InterpretedPredicate.create(predicate.transform {
case a: AttributeReference =>
- val index = partitionColumns.indexWhere(a.name == _.name)
+ val index = if (isCaseSensitive) {
+ partitionColumns.indexWhere(a.name == _.name)
+ } else {
+ partitionColumns.indexWhere(c => a.name.equalsIgnoreCase(c.name))
+ }
BoundReference(index, partitionColumns(index).dataType, nullable = true)
})
http://git-wip-us.apache.org/repos/asf/spark/blob/958039a1/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 06436a1..542e4c2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -1013,7 +1013,8 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
test("SPARK-11301: fix case sensitivity for filter on partitioned columns") {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
withTempPath { path =>
- Seq(2012 -> "a").toDF("year", "val").write.partitionBy("year").parquet(path.getAbsolutePath)
+ Seq(2012 -> "a", 1999 -> "b").toDF("year", "val").write.partitionBy("year")
+ .parquet(path.getAbsolutePath)
val df = sqlContext.read.parquet(path.getAbsolutePath)
checkAnswer(df.filter($"yEAr" > 2000).select($"val"), Row("a"))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org