You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/08/25 06:09:36 UTC

[spark] branch master updated: [SPARK-36567][SQL] Support foldable special datetime strings by `CAST`

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new df0ec56  [SPARK-36567][SQL] Support foldable special datetime strings by `CAST`
df0ec56 is described below

commit df0ec56723f0b47c3629055fa7a8c63bb4285147
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Wed Aug 25 14:08:59 2021 +0800

    [SPARK-36567][SQL] Support foldable special datetime strings by `CAST`
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to add new correctness rule `SpecialDatetimeValues` to the final analysis phase. It replaces casts of strings to date/timestamp_ltz/timestamp_ntz by literals of such types if the strings contain special datetime values like `today`, `yesterday` and `tomorrow`, and the input strings are foldable.
    
    ### Why are the changes needed?
    1. To avoid a breaking change.
    2. To improve user experience with Spark SQL. After the PR https://github.com/apache/spark/pull/32714, users have to use typed literals instead of implicit casts. For instance,
    at Spark 3.1:
    ```sql
    select ts_col > 'now';
    ```
    but the query fails at the moment, and users have to use typed timestamp literal:
    ```sql
    select ts_col > timestamp'now';
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No. Previous release 3.1 has supported the feature already till it was removed by https://github.com/apache/spark/pull/32714.
    
    ### How was this patch tested?
    1. Manually tested via the sql command line:
    ```sql
    spark-sql> select cast('today' as date);
    2021-08-24
    spark-sql> select timestamp('today');
    2021-08-24 00:00:00
    spark-sql> select timestamp'tomorrow' > 'today';
    true
    ```
    2. By running new test suite:
    ```
    $ build/sbt "sql/testOnly org.apache.spark.sql.catalyst.optimizer.SpecialDatetimeValuesSuite"
    ```
    
    Closes #33816 from MaxGekk/foldable-datetime-special-values.
    
    Authored-by: Max Gekk <ma...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |   4 +-
 .../sql/catalyst/optimizer/finishAnalysis.scala    |  23 +++++
 .../optimizer/SpecialDatetimeValuesSuite.scala     | 101 +++++++++++++++++++++
 3 files changed, 127 insertions(+), 1 deletion(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index ea37cbb..ed4ab9c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -158,7 +158,8 @@ abstract class Optimizer(catalogManager: CatalogManager)
       RewriteNonCorrelatedExists,
       PullOutGroupingExpressions,
       ComputeCurrentTime,
-      ReplaceCurrentLike(catalogManager)) ::
+      ReplaceCurrentLike(catalogManager),
+      SpecialDatetimeValues) ::
     //////////////////////////////////////////////////////////////////////////////////////////
     // Optimizer rules start here
     //////////////////////////////////////////////////////////////////////////////////////////
@@ -266,6 +267,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
       EliminateView.ruleName ::
       ReplaceExpressions.ruleName ::
       ComputeCurrentTime.ruleName ::
+      SpecialDatetimeValues.ruleName ::
       ReplaceCurrentLike(catalogManager).ruleName ::
       RewriteDistinctAggregates.ruleName ::
       ReplaceDeduplicateWithAggregate.ruleName ::
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
index deacc3b..daf4c5e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.catalyst.trees.TreePattern._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, convertSpecialTimestamp, convertSpecialTimestampNTZ}
 import org.apache.spark.sql.connector.catalog.CatalogManager
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
@@ -119,3 +120,25 @@ case class ReplaceCurrentLike(catalogManager: CatalogManager) extends Rule[Logic
     }
   }
 }
+
+/**
+ * Replaces casts of special datetime strings by its date/timestamp values
+ * if the input strings are foldable.
+ */
+object SpecialDatetimeValues extends Rule[LogicalPlan] {
+  private val conv = Map[DataType, (String, java.time.ZoneId) => Option[Any]](
+    DateType -> convertSpecialDate,
+    TimestampType -> convertSpecialTimestamp,
+    TimestampNTZType -> ((s: String, _: java.time.ZoneId) => convertSpecialTimestampNTZ(s))
+  )
+  def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.transformAllExpressionsWithPruning(_.containsPattern(CAST)) {
+      case cast @ Cast(e, dt @ (DateType | TimestampType | TimestampNTZType), _, _)
+        if e.foldable && e.dataType == StringType =>
+        Option(e.eval())
+          .flatMap(s => conv(dt)(s.toString, cast.zoneId))
+          .map(Literal(_, dt))
+          .getOrElse(cast)
+    }
+  }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SpecialDatetimeValuesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SpecialDatetimeValuesSuite.scala
new file mode 100644
index 0000000..e68a751
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SpecialDatetimeValuesSuite.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import java.time.{Instant, LocalDate, LocalDateTime, LocalTime, ZoneId}
+
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, Literal}
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_MINUTE
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateTimeToMicros}
+import org.apache.spark.sql.types.{AtomicType, DateType, TimestampNTZType, TimestampType}
+
+class SpecialDatetimeValuesSuite extends PlanTest {
+  object Optimize extends RuleExecutor[LogicalPlan] {
+    val batches = Seq(Batch("SpecialDatetimeValues", Once, SpecialDatetimeValues))
+  }
+
+  test("special date values") {
+    testSpecialDatetimeValues { zoneId =>
+      val expected = Set(
+        LocalDate.ofEpochDay(0),
+        LocalDate.now(zoneId),
+        LocalDate.now(zoneId).minusDays(1),
+        LocalDate.now(zoneId).plusDays(1)
+      ).map(_.toEpochDay.toInt)
+      val in = Project(Seq(
+        Alias(Cast(Literal("epoch"), DateType, Some(zoneId.getId)), "epoch")(),
+        Alias(Cast(Literal("today"), DateType, Some(zoneId.getId)), "today")(),
+        Alias(Cast(Literal("yesterday"), DateType, Some(zoneId.getId)), "yesterday")(),
+        Alias(Cast(Literal("tomorrow"), DateType, Some(zoneId.getId)), "tomorrow")()),
+        LocalRelation())
+
+      val plan = Optimize.execute(in.analyze).asInstanceOf[Project]
+      val lits = new scala.collection.mutable.ArrayBuffer[Int]
+      plan.transformAllExpressions { case e: Literal if e.dataType == DateType =>
+        lits += e.value.asInstanceOf[Int]
+        e
+      }
+      assert(expected === lits.toSet)
+    }
+  }
+
+  private def testSpecialTs(tsType: AtomicType, expected: Set[Long], zoneId: ZoneId): Unit = {
+    val in = Project(Seq(
+      Alias(Cast(Literal("epoch"), tsType, Some(zoneId.getId)), "epoch")(),
+      Alias(Cast(Literal("now"), tsType, Some(zoneId.getId)), "now")(),
+      Alias(Cast(Literal("tomorrow"), tsType, Some(zoneId.getId)), "tomorrow")(),
+      Alias(Cast(Literal("yesterday"), tsType, Some(zoneId.getId)), "yesterday")()),
+      LocalRelation())
+
+    val plan = Optimize.execute(in.analyze).asInstanceOf[Project]
+    val lits = new scala.collection.mutable.ArrayBuffer[Long]
+    plan.transformAllExpressions { case e: Literal if e.dataType == tsType =>
+      lits += e.value.asInstanceOf[Long]
+      e
+    }
+    assert(lits.forall(ts => expected.exists(ets => Math.abs(ets -ts) <= MICROS_PER_MINUTE)))
+  }
+
+  test("special timestamp_ltz values") {
+    testSpecialDatetimeValues { zoneId =>
+      val expected = Set(
+        Instant.ofEpochSecond(0),
+        Instant.now(),
+        Instant.now().atZone(zoneId).`with`(LocalTime.MIDNIGHT).plusDays(1).toInstant,
+        Instant.now().atZone(zoneId).`with`(LocalTime.MIDNIGHT).minusDays(1).toInstant
+      ).map(instantToMicros)
+      testSpecialTs(TimestampType, expected, zoneId)
+    }
+  }
+
+  test("special timestamp_ntz values") {
+    testSpecialDatetimeValues { zoneId =>
+      val expected = Set(
+        LocalDateTime.of(1970, 1, 1, 0, 0),
+        LocalDateTime.now(),
+        LocalDateTime.now().`with`(LocalTime.MIDNIGHT).plusDays(1),
+        LocalDateTime.now().`with`(LocalTime.MIDNIGHT).minusDays(1)
+      ).map(localDateTimeToMicros)
+      testSpecialTs(TimestampNTZType, expected, zoneId)
+    }
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org