You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/10/13 16:35:19 UTC
spark git commit: [SPARK-21247][SQL] Type comparison should respect
case-sensitive SQL conf
Repository: spark
Updated Branches:
refs/heads/master e6e36004a -> 6412ea175
[SPARK-21247][SQL] Type comparison should respect case-sensitive SQL conf
## What changes were proposed in this pull request?
This is an effort to reduce the difference between Hive and Spark. Spark supports case-sensitivity in columns. Especially, for Struct types, with `spark.sql.caseSensitive=true`, the following is supported.
```scala
scala> sql("select named_struct('a', 1, 'A', 2).a").show
+--------------------------+
|named_struct(a, 1, A, 2).a|
+--------------------------+
| 1|
+--------------------------+
scala> sql("select named_struct('a', 1, 'A', 2).A").show
+--------------------------+
|named_struct(a, 1, A, 2).A|
+--------------------------+
| 2|
+--------------------------+
```
And vice versa, with `spark.sql.caseSensitive=false`, the following is supported.
```scala
scala> sql("select named_struct('a', 1).A, named_struct('A', 1).a").show
+--------------------+--------------------+
|named_struct(a, 1).A|named_struct(A, 1).a|
+--------------------+--------------------+
| 1| 1|
+--------------------+--------------------+
```
However, types are considered different. For example, SET operations fail.
```scala
scala> sql("SELECT named_struct('a',1) union all (select named_struct('A',2))").show
org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types. struct<A:int> <> struct<a:int> at the first column of the second table;;
'Union
:- Project [named_struct(a, 1) AS named_struct(a, 1)#57]
: +- OneRowRelation$
+- Project [named_struct(A, 2) AS named_struct(A, 2)#58]
+- OneRowRelation$
```
This PR aims to support case-insensitive type equality. For example, in Set operation, the above operation succeed when `spark.sql.caseSensitive=false`.
```scala
scala> sql("SELECT named_struct('a',1) union all (select named_struct('A',2))").show
+------------------+
|named_struct(a, 1)|
+------------------+
| [1]|
| [2]|
+------------------+
```
## How was this patch tested?
Pass the Jenkins with a newly add test case.
Author: Dongjoon Hyun <do...@apache.org>
Closes #18460 from dongjoon-hyun/SPARK-21247.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6412ea17
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6412ea17
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6412ea17
Branch: refs/heads/master
Commit: 6412ea1759d39a2380c572ec24cfd8ae4f2d81f7
Parents: e6e3600
Author: Dongjoon Hyun <do...@apache.org>
Authored: Sat Oct 14 00:35:12 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Sat Oct 14 00:35:12 2017 +0800
----------------------------------------------------------------------
.../sql/catalyst/analysis/TypeCoercion.scala | 10 ++++
.../org/apache/spark/sql/types/DataType.scala | 7 ++-
.../catalyst/analysis/TypeCoercionSuite.scala | 52 ++++++++++++++++++--
.../org/apache/spark/sql/SQLQuerySuite.scala | 38 ++++++++++++++
4 files changed, 102 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/6412ea17/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
index 9ffe646..532d22d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
@@ -100,6 +100,16 @@ object TypeCoercion {
case (_: TimestampType, _: DateType) | (_: DateType, _: TimestampType) =>
Some(TimestampType)
+ case (t1 @ StructType(fields1), t2 @ StructType(fields2)) if t1.sameType(t2) =>
+ Some(StructType(fields1.zip(fields2).map { case (f1, f2) =>
+ // Since `t1.sameType(t2)` is true, two StructTypes have the same DataType
+ // except `name` (in case of `spark.sql.caseSensitive=false`) and `nullable`.
+ // - Different names: use f1.name
+ // - Different nullabilities: `nullable` is true iff one of them is nullable.
+ val dataType = findTightestCommonType(f1.dataType, f2.dataType).get
+ StructField(f1.name, dataType, nullable = f1.nullable || f2.nullable)
+ }))
+
case _ => None
}
http://git-wip-us.apache.org/repos/asf/spark/blob/6412ea17/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
index 30745c6..d6e0df1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
@@ -26,6 +26,7 @@ import org.json4s.jackson.JsonMethods._
import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.Utils
/**
@@ -80,7 +81,11 @@ abstract class DataType extends AbstractDataType {
* (`StructField.nullable`, `ArrayType.containsNull`, and `MapType.valueContainsNull`).
*/
private[spark] def sameType(other: DataType): Boolean =
- DataType.equalsIgnoreNullability(this, other)
+ if (SQLConf.get.caseSensitiveAnalysis) {
+ DataType.equalsIgnoreNullability(this, other)
+ } else {
+ DataType.equalsIgnoreCaseAndNullability(this, other)
+ }
/**
* Returns the same data type but set all nullability fields are true
http://git-wip-us.apache.org/repos/asf/spark/blob/6412ea17/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
index d62e3b6..793e04f 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
@@ -131,14 +131,17 @@ class TypeCoercionSuite extends AnalysisTest {
widenFunc: (DataType, DataType) => Option[DataType],
t1: DataType,
t2: DataType,
- expected: Option[DataType]): Unit = {
+ expected: Option[DataType],
+ isSymmetric: Boolean = true): Unit = {
var found = widenFunc(t1, t2)
assert(found == expected,
s"Expected $expected as wider common type for $t1 and $t2, found $found")
// Test both directions to make sure the widening is symmetric.
- found = widenFunc(t2, t1)
- assert(found == expected,
- s"Expected $expected as wider common type for $t2 and $t1, found $found")
+ if (isSymmetric) {
+ found = widenFunc(t2, t1)
+ assert(found == expected,
+ s"Expected $expected as wider common type for $t2 and $t1, found $found")
+ }
}
test("implicit type cast - ByteType") {
@@ -385,6 +388,47 @@ class TypeCoercionSuite extends AnalysisTest {
widenTest(NullType, StructType(Seq()), Some(StructType(Seq())))
widenTest(StringType, MapType(IntegerType, StringType, true), None)
widenTest(ArrayType(IntegerType), StructType(Seq()), None)
+
+ widenTest(
+ StructType(Seq(StructField("a", IntegerType))),
+ StructType(Seq(StructField("b", IntegerType))),
+ None)
+ widenTest(
+ StructType(Seq(StructField("a", IntegerType, nullable = false))),
+ StructType(Seq(StructField("a", DoubleType, nullable = false))),
+ None)
+
+ widenTest(
+ StructType(Seq(StructField("a", IntegerType, nullable = false))),
+ StructType(Seq(StructField("a", IntegerType, nullable = false))),
+ Some(StructType(Seq(StructField("a", IntegerType, nullable = false)))))
+ widenTest(
+ StructType(Seq(StructField("a", IntegerType, nullable = false))),
+ StructType(Seq(StructField("a", IntegerType, nullable = true))),
+ Some(StructType(Seq(StructField("a", IntegerType, nullable = true)))))
+ widenTest(
+ StructType(Seq(StructField("a", IntegerType, nullable = true))),
+ StructType(Seq(StructField("a", IntegerType, nullable = false))),
+ Some(StructType(Seq(StructField("a", IntegerType, nullable = true)))))
+ widenTest(
+ StructType(Seq(StructField("a", IntegerType, nullable = true))),
+ StructType(Seq(StructField("a", IntegerType, nullable = true))),
+ Some(StructType(Seq(StructField("a", IntegerType, nullable = true)))))
+
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+ widenTest(
+ StructType(Seq(StructField("a", IntegerType))),
+ StructType(Seq(StructField("A", IntegerType))),
+ None)
+ }
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+ checkWidenType(
+ TypeCoercion.findTightestCommonType,
+ StructType(Seq(StructField("a", IntegerType), StructField("B", IntegerType))),
+ StructType(Seq(StructField("A", IntegerType), StructField("b", IntegerType))),
+ Some(StructType(Seq(StructField("a", IntegerType), StructField("B", IntegerType)))),
+ isSymmetric = false)
+ }
}
test("wider common type for decimal and array") {
http://git-wip-us.apache.org/repos/asf/spark/blob/6412ea17/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 93a7777..f0c58e2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2646,6 +2646,44 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}
+ test("SPARK-21247: Allow case-insensitive type equality in Set operation") {
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+ sql("SELECT struct(1 a) UNION ALL (SELECT struct(2 A))")
+ sql("SELECT struct(1 a) EXCEPT (SELECT struct(2 A))")
+
+ withTable("t", "S") {
+ sql("CREATE TABLE t(c struct<f:int>) USING parquet")
+ sql("CREATE TABLE S(C struct<F:int>) USING parquet")
+ Seq(("c", "C"), ("C", "c"), ("c.f", "C.F"), ("C.F", "c.f")).foreach {
+ case (left, right) =>
+ checkAnswer(sql(s"SELECT * FROM t, S WHERE t.$left = S.$right"), Seq.empty)
+ }
+ }
+ }
+
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+ val m1 = intercept[AnalysisException] {
+ sql("SELECT struct(1 a) UNION ALL (SELECT struct(2 A))")
+ }.message
+ assert(m1.contains("Union can only be performed on tables with the compatible column types"))
+
+ val m2 = intercept[AnalysisException] {
+ sql("SELECT struct(1 a) EXCEPT (SELECT struct(2 A))")
+ }.message
+ assert(m2.contains("Except can only be performed on tables with the compatible column types"))
+
+ withTable("t", "S") {
+ sql("CREATE TABLE t(c struct<f:int>) USING parquet")
+ sql("CREATE TABLE S(C struct<F:int>) USING parquet")
+ checkAnswer(sql("SELECT * FROM t, S WHERE t.c.f = S.C.F"), Seq.empty)
+ val m = intercept[AnalysisException] {
+ sql("SELECT * FROM t, S WHERE c = C")
+ }.message
+ assert(m.contains("cannot resolve '(t.`c` = S.`C`)' due to data type mismatch"))
+ }
+ }
+ }
+
test("SPARK-21335: support un-aliased subquery") {
withTempView("v") {
Seq(1 -> "a").toDF("i", "j").createOrReplaceTempView("v")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org