You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/03/25 23:38:59 UTC

spark git commit: [SPARK-23549][SQL] Cast to timestamp when comparing timestamp with date

Repository: spark
Updated Branches:
  refs/heads/master 5f653d4f7 -> e4bec7cb8


[SPARK-23549][SQL] Cast to timestamp when comparing timestamp with date

## What changes were proposed in this pull request?

This PR fixes an incorrect comparison in SQL between timestamp and date. This is because both of them are casted to `string` and then are compared lexicographically. This implementation shows `false` regarding this query `spark.sql("select cast('2017-03-01 00:00:00' as timestamp) between cast('2017-02-28' as date) and cast('2017-03-01' as date)").show`.

This PR shows `true` for this query by casting `date("2017-03-01")` to `timestamp("2017-03-01 00:00:00")`.

(Please fill in changes proposed in this fix)

## How was this patch tested?

Added new UTs to `TypeCoercionSuite`.

Author: Kazuaki Ishizaki <is...@jp.ibm.com>

Closes #20774 from kiszk/SPARK-23549.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e4bec7cb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e4bec7cb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e4bec7cb

Branch: refs/heads/master
Commit: e4bec7cb88b9ee63f8497e3f9e0ab0bfa5d5a77c
Parents: 5f653d4
Author: Kazuaki Ishizaki <is...@jp.ibm.com>
Authored: Sun Mar 25 16:38:49 2018 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Sun Mar 25 16:38:49 2018 -0700

----------------------------------------------------------------------
 docs/sql-programming-guide.md                   |  1 +
 .../sql/catalyst/analysis/TypeCoercion.scala    | 29 +++++++++-----
 .../org/apache/spark/sql/internal/SQLConf.scala | 13 ++++++
 .../catalyst/analysis/TypeCoercionSuite.scala   | 34 +++++++++++++---
 .../sql-tests/inputs/predicate-functions.sql    |  7 ++++
 .../results/predicate-functions.sql.out         | 42 +++++++++++++++++++-
 6 files changed, 108 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e4bec7cb/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 421e2ea..2b393f3 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1808,6 +1808,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see
  - Since Spark 2.4, writing an empty dataframe to a directory launches at least one write task, even if physically the dataframe has no partition. This introduces a small behavior change that for self-describing file formats like Parquet and Orc, Spark creates a metadata-only file in the target directory when writing a 0-partition dataframe, so that schema inference can still work if users read that directory later. The new behavior is more reasonable and more consistent regarding writing empty dataframe.
  - Since Spark 2.4, expression IDs in UDF arguments do not appear in column names. For example, an column name in Spark 2.4 is not `UDF:f(col0 AS colA#28)` but ``UDF:f(col0 AS `colA`)``.
  - Since Spark 2.4, writing a dataframe with an empty or nested empty schema using any file formats (parquet, orc, json, text, csv etc.) is not allowed. An exception is thrown when attempting to write dataframes with empty schema. 
+ - Since Spark 2.4, Spark compares a DATE type with a TIMESTAMP type after promotes both sides to TIMESTAMP. To set `false` to `spark.sql.hive.compareDateTimestampInTimestamp` restores the previous behavior. This option will be removed in Spark 3.0.
 
 ## Upgrading From Spark SQL 2.2 to 2.3
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e4bec7cb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
index e8669c4..ec7e776 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
@@ -47,9 +47,9 @@ import org.apache.spark.sql.types._
 object TypeCoercion {
 
   def typeCoercionRules(conf: SQLConf): List[Rule[LogicalPlan]] =
-    InConversion ::
+    InConversion(conf) ::
       WidenSetOperationTypes ::
-      PromoteStrings ::
+      PromoteStrings(conf) ::
       DecimalPrecision ::
       BooleanEquality ::
       FunctionArgumentConversion ::
@@ -127,7 +127,8 @@ object TypeCoercion {
    * is a String and the other is not. It also handles when one op is a Date and the
    * other is a Timestamp by making the target type to be String.
    */
-  val findCommonTypeForBinaryComparison: (DataType, DataType) => Option[DataType] = {
+  private def findCommonTypeForBinaryComparison(
+      dt1: DataType, dt2: DataType, conf: SQLConf): Option[DataType] = (dt1, dt2) match {
     // We should cast all relative timestamp/date/string comparison into string comparisons
     // This behaves as a user would expect because timestamp strings sort lexicographically.
     // i.e. TimeStamp(2013-01-01 00:00 ...) < "2014" = true
@@ -135,11 +136,17 @@ object TypeCoercion {
     case (DateType, StringType) => Some(StringType)
     case (StringType, TimestampType) => Some(StringType)
     case (TimestampType, StringType) => Some(StringType)
-    case (TimestampType, DateType) => Some(StringType)
-    case (DateType, TimestampType) => Some(StringType)
     case (StringType, NullType) => Some(StringType)
     case (NullType, StringType) => Some(StringType)
 
+    // Cast to TimestampType when we compare DateType with TimestampType
+    // if conf.compareDateTimestampInTimestamp is true
+    // i.e. TimeStamp('2017-03-01 00:00:00') eq Date('2017-03-01') = true
+    case (TimestampType, DateType)
+      => if (conf.compareDateTimestampInTimestamp) Some(TimestampType) else Some(StringType)
+    case (DateType, TimestampType)
+      => if (conf.compareDateTimestampInTimestamp) Some(TimestampType) else Some(StringType)
+
     // There is no proper decimal type we can pick,
     // using double type is the best we can do.
     // See SPARK-22469 for details.
@@ -147,7 +154,7 @@ object TypeCoercion {
     case (s: StringType, n: DecimalType) => Some(DoubleType)
 
     case (l: StringType, r: AtomicType) if r != StringType => Some(r)
-    case (l: AtomicType, r: StringType) if (l != StringType) => Some(l)
+    case (l: AtomicType, r: StringType) if l != StringType => Some(l)
     case (l, r) => None
   }
 
@@ -313,7 +320,7 @@ object TypeCoercion {
   /**
    * Promotes strings that appear in arithmetic expressions.
    */
-  object PromoteStrings extends TypeCoercionRule {
+  case class PromoteStrings(conf: SQLConf) extends TypeCoercionRule {
     private def castExpr(expr: Expression, targetType: DataType): Expression = {
       (expr.dataType, targetType) match {
         case (NullType, dt) => Literal.create(null, targetType)
@@ -342,8 +349,8 @@ object TypeCoercion {
         p.makeCopy(Array(left, Cast(right, TimestampType)))
 
       case p @ BinaryComparison(left, right)
-        if findCommonTypeForBinaryComparison(left.dataType, right.dataType).isDefined =>
-        val commonType = findCommonTypeForBinaryComparison(left.dataType, right.dataType).get
+          if findCommonTypeForBinaryComparison(left.dataType, right.dataType, conf).isDefined =>
+        val commonType = findCommonTypeForBinaryComparison(left.dataType, right.dataType, conf).get
         p.makeCopy(Array(castExpr(left, commonType), castExpr(right, commonType)))
 
       case Abs(e @ StringType()) => Abs(Cast(e, DoubleType))
@@ -374,7 +381,7 @@ object TypeCoercion {
    *    operator type is found the original expression will be returned and an
    *    Analysis Exception will be raised at the type checking phase.
    */
-  object InConversion extends TypeCoercionRule {
+  case class InConversion(conf: SQLConf) extends TypeCoercionRule {
     private def flattenExpr(expr: Expression): Seq[Expression] = {
       expr match {
         // Multi columns in IN clause is represented as a CreateNamedStruct.
@@ -400,7 +407,7 @@ object TypeCoercion {
         val rhs = sub.output
 
         val commonTypes = lhs.zip(rhs).flatMap { case (l, r) =>
-          findCommonTypeForBinaryComparison(l.dataType, r.dataType)
+          findCommonTypeForBinaryComparison(l.dataType, r.dataType, conf)
             .orElse(findTightestCommonType(l.dataType, r.dataType))
         }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e4bec7cb/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 11864bd..9cb03b5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -479,6 +479,16 @@ object SQLConf {
     .checkValues(HiveCaseSensitiveInferenceMode.values.map(_.toString))
     .createWithDefault(HiveCaseSensitiveInferenceMode.INFER_AND_SAVE.toString)
 
+  val TYPECOERCION_COMPARE_DATE_TIMESTAMP_IN_TIMESTAMP =
+    buildConf("spark.sql.typeCoercion.compareDateTimestampInTimestamp")
+      .internal()
+      .doc("When true (default), compare Date with Timestamp after converting both sides to " +
+        "Timestamp. This behavior is compatible with Hive 2.2 or later. See HIVE-15236. " +
+        "When false, restore the behavior prior to Spark 2.4. Compare Date with Timestamp after " +
+        "converting both sides to string. This config will be removed in spark 3.0")
+      .booleanConf
+      .createWithDefault(true)
+
   val OPTIMIZER_METADATA_ONLY = buildConf("spark.sql.optimizer.metadataOnly")
     .doc("When true, enable the metadata-only query optimization that use the table's metadata " +
       "to produce the partition columns instead of table scans. It applies when all the columns " +
@@ -1332,6 +1342,9 @@ class SQLConf extends Serializable with Logging {
   def caseSensitiveInferenceMode: HiveCaseSensitiveInferenceMode.Value =
     HiveCaseSensitiveInferenceMode.withName(getConf(HIVE_CASE_SENSITIVE_INFERENCE))
 
+  def compareDateTimestampInTimestamp : Boolean =
+    getConf(TYPECOERCION_COMPARE_DATE_TIMESTAMP_IN_TIMESTAMP)
+
   def gatherFastStats: Boolean = getConf(GATHER_FASTSTAT)
 
   def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY)

http://git-wip-us.apache.org/repos/asf/spark/blob/e4bec7cb/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
index 52a7ebd..8ac49dc 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
@@ -1207,7 +1207,7 @@ class TypeCoercionSuite extends AnalysisTest {
    */
   test("make sure rules do not fire early") {
     // InConversion
-    val inConversion = TypeCoercion.InConversion
+    val inConversion = TypeCoercion.InConversion(conf)
     ruleTest(inConversion,
       In(UnresolvedAttribute("a"), Seq(Literal(1))),
       In(UnresolvedAttribute("a"), Seq(Literal(1)))
@@ -1251,18 +1251,40 @@ class TypeCoercionSuite extends AnalysisTest {
   }
 
   test("binary comparison with string promotion") {
-    ruleTest(PromoteStrings,
+    val rule = TypeCoercion.PromoteStrings(conf)
+    ruleTest(rule,
       GreaterThan(Literal("123"), Literal(1)),
       GreaterThan(Cast(Literal("123"), IntegerType), Literal(1)))
-    ruleTest(PromoteStrings,
+    ruleTest(rule,
       LessThan(Literal(true), Literal("123")),
       LessThan(Literal(true), Cast(Literal("123"), BooleanType)))
-    ruleTest(PromoteStrings,
+    ruleTest(rule,
       EqualTo(Literal(Array(1, 2)), Literal("123")),
       EqualTo(Literal(Array(1, 2)), Literal("123")))
-    ruleTest(PromoteStrings,
+    ruleTest(rule,
       GreaterThan(Literal("1.5"), Literal(BigDecimal("0.5"))),
-      GreaterThan(Cast(Literal("1.5"), DoubleType), Cast(Literal(BigDecimal("0.5")), DoubleType)))
+      GreaterThan(Cast(Literal("1.5"), DoubleType), Cast(Literal(BigDecimal("0.5")),
+        DoubleType)))
+    Seq(true, false).foreach { convertToTS =>
+      withSQLConf(
+        "spark.sql.typeCoercion.compareDateTimestampInTimestamp" -> convertToTS.toString) {
+        val date0301 = Literal(java.sql.Date.valueOf("2017-03-01"))
+        val timestamp0301000000 = Literal(Timestamp.valueOf("2017-03-01 00:00:00"))
+        val timestamp0301000001 = Literal(Timestamp.valueOf("2017-03-01 00:00:01"))
+        if (convertToTS) {
+          // `Date` should be treated as timestamp at 00:00:00 See SPARK-23549
+          ruleTest(rule, EqualTo(date0301, timestamp0301000000),
+            EqualTo(Cast(date0301, TimestampType), timestamp0301000000))
+          ruleTest(rule, LessThan(date0301, timestamp0301000001),
+            LessThan(Cast(date0301, TimestampType), timestamp0301000001))
+        } else {
+          ruleTest(rule, LessThan(date0301, timestamp0301000000),
+            LessThan(Cast(date0301, StringType), Cast(timestamp0301000000, StringType)))
+          ruleTest(rule, LessThan(date0301, timestamp0301000001),
+            LessThan(Cast(date0301, StringType), Cast(timestamp0301000001, StringType)))
+        }
+      }
+    }
   }
 
   test("cast WindowFrame boundaries to the type they operate upon") {

http://git-wip-us.apache.org/repos/asf/spark/blob/e4bec7cb/sql/core/src/test/resources/sql-tests/inputs/predicate-functions.sql
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/inputs/predicate-functions.sql b/sql/core/src/test/resources/sql-tests/inputs/predicate-functions.sql
index e99d5ce..fadb4bb 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/predicate-functions.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/predicate-functions.sql
@@ -39,3 +39,10 @@ select 2.0 <= '2.2';
 select 0.5 <= '1.5';
 select to_date('2009-07-30 04:17:52') <= to_date('2009-07-30 04:17:52');
 select to_date('2009-07-30 04:17:52') <= '2009-07-30 04:17:52';
+
+-- SPARK-23549: Cast to timestamp when comparing timestamp with date
+select to_date('2017-03-01') = to_timestamp('2017-03-01 00:00:00');
+select to_timestamp('2017-03-01 00:00:01') > to_date('2017-03-01');
+select to_timestamp('2017-03-01 00:00:01') >= to_date('2017-03-01');
+select to_date('2017-03-01') < to_timestamp('2017-03-01 00:00:01');
+select to_date('2017-03-01') <= to_timestamp('2017-03-01 00:00:01');

http://git-wip-us.apache.org/repos/asf/spark/blob/e4bec7cb/sql/core/src/test/resources/sql-tests/results/predicate-functions.sql.out
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/results/predicate-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/predicate-functions.sql.out
index d51f6d3..cf828c6 100644
--- a/sql/core/src/test/resources/sql-tests/results/predicate-functions.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/predicate-functions.sql.out
@@ -1,5 +1,5 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 32
+-- Number of queries: 37
 
 
 -- !query 0
@@ -256,3 +256,43 @@ select to_date('2009-07-30 04:17:52') <= '2009-07-30 04:17:52'
 struct<(CAST(to_date('2009-07-30 04:17:52') AS STRING) <= 2009-07-30 04:17:52):boolean>
 -- !query 31 output
 true
+
+
+-- !query 32
+select to_date('2017-03-01') = to_timestamp('2017-03-01 00:00:00')
+-- !query 32 schema
+struct<(CAST(to_date('2017-03-01') AS TIMESTAMP) = to_timestamp('2017-03-01 00:00:00')):boolean>
+-- !query 32 output
+true
+
+
+-- !query 33
+select to_timestamp('2017-03-01 00:00:01') > to_date('2017-03-01')
+-- !query 33 schema
+struct<(to_timestamp('2017-03-01 00:00:01') > CAST(to_date('2017-03-01') AS TIMESTAMP)):boolean>
+-- !query 33 output
+true
+
+
+-- !query 34
+select to_timestamp('2017-03-01 00:00:01') >= to_date('2017-03-01')
+-- !query 34 schema
+struct<(to_timestamp('2017-03-01 00:00:01') >= CAST(to_date('2017-03-01') AS TIMESTAMP)):boolean>
+-- !query 34 output
+true
+
+
+-- !query 35
+select to_date('2017-03-01') < to_timestamp('2017-03-01 00:00:01')
+-- !query 35 schema
+struct<(CAST(to_date('2017-03-01') AS TIMESTAMP) < to_timestamp('2017-03-01 00:00:01')):boolean>
+-- !query 35 output
+true
+
+
+-- !query 36
+select to_date('2017-03-01') <= to_timestamp('2017-03-01 00:00:01')
+-- !query 36 schema
+struct<(CAST(to_date('2017-03-01') AS TIMESTAMP) <= to_timestamp('2017-03-01 00:00:01')):boolean>
+-- !query 36 output
+true


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org