You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/10/13 16:35:19 UTC

spark git commit: [SPARK-21247][SQL] Type comparison should respect case-sensitive SQL conf

Repository: spark
Updated Branches:
  refs/heads/master e6e36004a -> 6412ea175


[SPARK-21247][SQL] Type comparison should respect case-sensitive SQL conf

## What changes were proposed in this pull request?

This is an effort to reduce the difference between Hive and Spark. Spark supports case-sensitivity in columns. Especially, for Struct types, with `spark.sql.caseSensitive=true`, the following is supported.

```scala
scala> sql("select named_struct('a', 1, 'A', 2).a").show
+--------------------------+
|named_struct(a, 1, A, 2).a|
+--------------------------+
|                         1|
+--------------------------+

scala> sql("select named_struct('a', 1, 'A', 2).A").show
+--------------------------+
|named_struct(a, 1, A, 2).A|
+--------------------------+
|                         2|
+--------------------------+
```

And vice versa, with `spark.sql.caseSensitive=false`, the following is supported.
```scala
scala> sql("select named_struct('a', 1).A, named_struct('A', 1).a").show
+--------------------+--------------------+
|named_struct(a, 1).A|named_struct(A, 1).a|
+--------------------+--------------------+
|                   1|                   1|
+--------------------+--------------------+
```

However, types are considered different. For example, SET operations fail.
```scala
scala> sql("SELECT named_struct('a',1) union all (select named_struct('A',2))").show
org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types. struct<A:int> <> struct<a:int> at the first column of the second table;;
'Union
:- Project [named_struct(a, 1) AS named_struct(a, 1)#57]
:  +- OneRowRelation$
+- Project [named_struct(A, 2) AS named_struct(A, 2)#58]
   +- OneRowRelation$
```

This PR aims to support case-insensitive type equality. For example, in Set operation, the above operation succeed when `spark.sql.caseSensitive=false`.

```scala
scala> sql("SELECT named_struct('a',1) union all (select named_struct('A',2))").show
+------------------+
|named_struct(a, 1)|
+------------------+
|               [1]|
|               [2]|
+------------------+
```

## How was this patch tested?

Pass the Jenkins with a newly add test case.

Author: Dongjoon Hyun <do...@apache.org>

Closes #18460 from dongjoon-hyun/SPARK-21247.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6412ea17
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6412ea17
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6412ea17

Branch: refs/heads/master
Commit: 6412ea1759d39a2380c572ec24cfd8ae4f2d81f7
Parents: e6e3600
Author: Dongjoon Hyun <do...@apache.org>
Authored: Sat Oct 14 00:35:12 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Sat Oct 14 00:35:12 2017 +0800

----------------------------------------------------------------------
 .../sql/catalyst/analysis/TypeCoercion.scala    | 10 ++++
 .../org/apache/spark/sql/types/DataType.scala   |  7 ++-
 .../catalyst/analysis/TypeCoercionSuite.scala   | 52 ++++++++++++++++++--
 .../org/apache/spark/sql/SQLQuerySuite.scala    | 38 ++++++++++++++
 4 files changed, 102 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6412ea17/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
index 9ffe646..532d22d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
@@ -100,6 +100,16 @@ object TypeCoercion {
     case (_: TimestampType, _: DateType) | (_: DateType, _: TimestampType) =>
       Some(TimestampType)
 
+    case (t1 @ StructType(fields1), t2 @ StructType(fields2)) if t1.sameType(t2) =>
+      Some(StructType(fields1.zip(fields2).map { case (f1, f2) =>
+        // Since `t1.sameType(t2)` is true, two StructTypes have the same DataType
+        // except `name` (in case of `spark.sql.caseSensitive=false`) and `nullable`.
+        // - Different names: use f1.name
+        // - Different nullabilities: `nullable` is true iff one of them is nullable.
+        val dataType = findTightestCommonType(f1.dataType, f2.dataType).get
+        StructField(f1.name, dataType, nullable = f1.nullable || f2.nullable)
+      }))
+
     case _ => None
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6412ea17/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
index 30745c6..d6e0df1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
@@ -26,6 +26,7 @@ import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark.annotation.InterfaceStability
 import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.util.Utils
 
 /**
@@ -80,7 +81,11 @@ abstract class DataType extends AbstractDataType {
    * (`StructField.nullable`, `ArrayType.containsNull`, and `MapType.valueContainsNull`).
    */
   private[spark] def sameType(other: DataType): Boolean =
-    DataType.equalsIgnoreNullability(this, other)
+    if (SQLConf.get.caseSensitiveAnalysis) {
+      DataType.equalsIgnoreNullability(this, other)
+    } else {
+      DataType.equalsIgnoreCaseAndNullability(this, other)
+    }
 
   /**
    * Returns the same data type but set all nullability fields are true

http://git-wip-us.apache.org/repos/asf/spark/blob/6412ea17/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
index d62e3b6..793e04f 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
@@ -131,14 +131,17 @@ class TypeCoercionSuite extends AnalysisTest {
       widenFunc: (DataType, DataType) => Option[DataType],
       t1: DataType,
       t2: DataType,
-      expected: Option[DataType]): Unit = {
+      expected: Option[DataType],
+      isSymmetric: Boolean = true): Unit = {
     var found = widenFunc(t1, t2)
     assert(found == expected,
       s"Expected $expected as wider common type for $t1 and $t2, found $found")
     // Test both directions to make sure the widening is symmetric.
-    found = widenFunc(t2, t1)
-    assert(found == expected,
-      s"Expected $expected as wider common type for $t2 and $t1, found $found")
+    if (isSymmetric) {
+      found = widenFunc(t2, t1)
+      assert(found == expected,
+        s"Expected $expected as wider common type for $t2 and $t1, found $found")
+    }
   }
 
   test("implicit type cast - ByteType") {
@@ -385,6 +388,47 @@ class TypeCoercionSuite extends AnalysisTest {
     widenTest(NullType, StructType(Seq()), Some(StructType(Seq())))
     widenTest(StringType, MapType(IntegerType, StringType, true), None)
     widenTest(ArrayType(IntegerType), StructType(Seq()), None)
+
+    widenTest(
+      StructType(Seq(StructField("a", IntegerType))),
+      StructType(Seq(StructField("b", IntegerType))),
+      None)
+    widenTest(
+      StructType(Seq(StructField("a", IntegerType, nullable = false))),
+      StructType(Seq(StructField("a", DoubleType, nullable = false))),
+      None)
+
+    widenTest(
+      StructType(Seq(StructField("a", IntegerType, nullable = false))),
+      StructType(Seq(StructField("a", IntegerType, nullable = false))),
+      Some(StructType(Seq(StructField("a", IntegerType, nullable = false)))))
+    widenTest(
+      StructType(Seq(StructField("a", IntegerType, nullable = false))),
+      StructType(Seq(StructField("a", IntegerType, nullable = true))),
+      Some(StructType(Seq(StructField("a", IntegerType, nullable = true)))))
+    widenTest(
+      StructType(Seq(StructField("a", IntegerType, nullable = true))),
+      StructType(Seq(StructField("a", IntegerType, nullable = false))),
+      Some(StructType(Seq(StructField("a", IntegerType, nullable = true)))))
+    widenTest(
+      StructType(Seq(StructField("a", IntegerType, nullable = true))),
+      StructType(Seq(StructField("a", IntegerType, nullable = true))),
+      Some(StructType(Seq(StructField("a", IntegerType, nullable = true)))))
+
+    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+      widenTest(
+        StructType(Seq(StructField("a", IntegerType))),
+        StructType(Seq(StructField("A", IntegerType))),
+        None)
+    }
+    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+      checkWidenType(
+        TypeCoercion.findTightestCommonType,
+        StructType(Seq(StructField("a", IntegerType), StructField("B", IntegerType))),
+        StructType(Seq(StructField("A", IntegerType), StructField("b", IntegerType))),
+        Some(StructType(Seq(StructField("a", IntegerType), StructField("B", IntegerType)))),
+        isSymmetric = false)
+    }
   }
 
   test("wider common type for decimal and array") {

http://git-wip-us.apache.org/repos/asf/spark/blob/6412ea17/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 93a7777..f0c58e2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2646,6 +2646,44 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
     }
   }
 
+  test("SPARK-21247: Allow case-insensitive type equality in Set operation") {
+    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+      sql("SELECT struct(1 a) UNION ALL (SELECT struct(2 A))")
+      sql("SELECT struct(1 a) EXCEPT (SELECT struct(2 A))")
+
+      withTable("t", "S") {
+        sql("CREATE TABLE t(c struct<f:int>) USING parquet")
+        sql("CREATE TABLE S(C struct<F:int>) USING parquet")
+        Seq(("c", "C"), ("C", "c"), ("c.f", "C.F"), ("C.F", "c.f")).foreach {
+          case (left, right) =>
+            checkAnswer(sql(s"SELECT * FROM t, S WHERE t.$left = S.$right"), Seq.empty)
+        }
+      }
+    }
+
+    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+      val m1 = intercept[AnalysisException] {
+        sql("SELECT struct(1 a) UNION ALL (SELECT struct(2 A))")
+      }.message
+      assert(m1.contains("Union can only be performed on tables with the compatible column types"))
+
+      val m2 = intercept[AnalysisException] {
+        sql("SELECT struct(1 a) EXCEPT (SELECT struct(2 A))")
+      }.message
+      assert(m2.contains("Except can only be performed on tables with the compatible column types"))
+
+      withTable("t", "S") {
+        sql("CREATE TABLE t(c struct<f:int>) USING parquet")
+        sql("CREATE TABLE S(C struct<F:int>) USING parquet")
+        checkAnswer(sql("SELECT * FROM t, S WHERE t.c.f = S.C.F"), Seq.empty)
+        val m = intercept[AnalysisException] {
+          sql("SELECT * FROM t, S WHERE c = C")
+        }.message
+        assert(m.contains("cannot resolve '(t.`c` = S.`C`)' due to data type mismatch"))
+      }
+    }
+  }
+
   test("SPARK-21335: support un-aliased subquery") {
     withTempView("v") {
       Seq(1 -> "a").toDF("i", "j").createOrReplaceTempView("v")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org