You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/04/10 13:55:20 UTC

[spark] branch master updated: [SPARK-27422][SQL] current_date() should return current date in the session time zone

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1470f23  [SPARK-27422][SQL] current_date() should return current date in the session time zone
1470f23 is described below

commit 1470f23ec93f13d6c847832c06d40f8fc9803129
Author: Maxim Gekk <ma...@gmail.com>
AuthorDate: Wed Apr 10 21:54:50 2019 +0800

    [SPARK-27422][SQL] current_date() should return current date in the session time zone
    
    ## What changes were proposed in this pull request?
    
    In the PR, I propose to revert 2 commits https://github.com/apache/spark/commit/06abd06112965cd73417ccceacdbd94b6b3d2793 and https://github.com/apache/spark/commit/61561c1c2d4e47191fdfe9bf3539a3db29e89fa9, and take current date via `LocalDate.now` in the session time zone. The result is stored as days since epoch `1970-01-01`.
    
    ## How was this patch tested?
    
    It was tested by `DateExpressionsSuite`, `DateFunctionsSuite`, `DateTimeUtilsSuite`, and `ComputeCurrentTimeSuite`.
    
    Closes #24330 from MaxGekk/current-date2.
    
    Lead-authored-by: Maxim Gekk <ma...@gmail.com>
    Co-authored-by: Maxim Gekk <ma...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 docs/sql-migration-guide-upgrade.md                |  2 --
 .../catalyst/expressions/datetimeExpressions.scala | 18 ++++++++++------
 .../sql/catalyst/optimizer/finishAnalysis.scala    | 25 +++++++++++-----------
 .../expressions/DateExpressionsSuite.scala         |  6 +++---
 .../optimizer/ComputeCurrentTimeSuite.scala        |  7 +++---
 .../execution/streaming/MicroBatchExecution.scala  |  2 +-
 .../scala/org/apache/spark/sql/functions.scala     |  2 +-
 .../apache/spark/sql/DataFrameAggregateSuite.scala |  2 +-
 .../org/apache/spark/sql/DateFunctionsSuite.scala  | 18 +++++++---------
 9 files changed, 40 insertions(+), 42 deletions(-)

diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md
index 741c510..b193522 100644
--- a/docs/sql-migration-guide-upgrade.md
+++ b/docs/sql-migration-guide-upgrade.md
@@ -120,8 +120,6 @@ license: |
 
   - In Spark version 2.4 and earlier, when reading a Hive Serde table with Spark native data sources(parquet/orc), Spark will infer the actual file schema and update the table schema in metastore. Since Spark 3.0, Spark doesn't infer the schema anymore. This should not cause any problems to end users, but if it does, please set `spark.sql.hive.caseSensitiveInferenceMode` to `INFER_AND_SAVE`.
 
-  - In Spark version 2.4 and earlier, the `current_date` function returns the current date shifted according to the SQL config `spark.sql.session.timeZone`. Since Spark 3.0, the function always returns the current date in the `UTC` time zone.
-
   - Since Spark 3.0, `TIMESTAMP` literals are converted to strings using the SQL config `spark.sql.session.timeZone`. In Spark version 2.4 and earlier, the conversion uses the default time zone of the Java virtual machine.
 
   - In Spark version 2.4, when a spark session is created via `cloneSession()`, the newly created spark session inherits its configuration from its parent `SparkContext` even though the same configuration may exist with a different value in its parent spark session. Since Spark 3.0, the configurations of a parent `SparkSession` have a higher precedence over the parent `SparkContext`.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
index 5fb0b85..aad9f20 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.catalyst.expressions
 
 import java.sql.Timestamp
-import java.time.{Instant, LocalDate, ZoneId, ZoneOffset}
+import java.time.{Instant, LocalDate, ZoneId}
 import java.time.temporal.IsoFields
 import java.util.{Locale, TimeZone}
 
@@ -54,26 +54,30 @@ trait TimeZoneAwareExpression extends Expression {
   @transient lazy val zoneId: ZoneId = DateTimeUtils.getZoneId(timeZoneId.get)
 }
 
-// scalastyle:off line.size.limit
 /**
- * Returns the current date in the UTC time zone at the start of query evaluation.
+ * Returns the current date at the start of query evaluation.
  * All calls of current_date within the same query return the same value.
  *
  * There is no code generation since this expression should get constant folded by the optimizer.
  */
 @ExpressionDescription(
-  usage = "_FUNC_() - Returns the current date in the UTC time zone at the start of query evaluation.",
+  usage = "_FUNC_() - Returns the current date at the start of query evaluation.",
   since = "1.5.0")
-// scalastyle:on line.size.limit
-case class CurrentDate() extends LeafExpression with CodegenFallback {
+case class CurrentDate(timeZoneId: Option[String] = None)
+  extends LeafExpression with TimeZoneAwareExpression with CodegenFallback {
+
+  def this() = this(None)
 
   override def foldable: Boolean = true
   override def nullable: Boolean = false
 
   override def dataType: DataType = DateType
 
+  override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
+    copy(timeZoneId = Option(timeZoneId))
+
   override def eval(input: InternalRow): Any = {
-    LocalDate.now(ZoneOffset.UTC).toEpochDay.toInt
+    localDateToDays(LocalDate.now(zoneId))
   }
 
   override def prettyName: String = "current_date"
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
index d0bf4ea..c213a21 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
-import java.util.concurrent.TimeUnit._
+import java.time.LocalDate
 
 import scala.collection.mutable
 
@@ -58,20 +58,19 @@ object ReplaceExpressions extends Rule[LogicalPlan] {
  */
 object ComputeCurrentTime extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = {
-    val currentDate = {
-      val dateExpr = CurrentDate()
-      val date = dateExpr.eval(EmptyRow).asInstanceOf[Int]
-      Literal.create(date, dateExpr.dataType)
-    }
-    val currentTimestamp = {
-      val timeExpr = CurrentTimestamp()
-      val timestamp = timeExpr.eval(EmptyRow).asInstanceOf[Long]
-      Literal.create(timestamp, timeExpr.dataType)
-    }
+    val currentDates = mutable.Map.empty[String, Literal]
+    val timeExpr = CurrentTimestamp()
+    val timestamp = timeExpr.eval(EmptyRow).asInstanceOf[Long]
+    val currentTime = Literal.create(timestamp, timeExpr.dataType)
 
     plan transformAllExpressions {
-      case CurrentDate() => currentDate
-      case CurrentTimestamp() => currentTimestamp
+      case CurrentDate(Some(timeZoneId)) =>
+        currentDates.getOrElseUpdate(timeZoneId, {
+          Literal.create(
+            LocalDate.now(DateTimeUtils.getZoneId(timeZoneId)),
+            DateType)
+        })
+      case CurrentTimestamp() => currentTime
     }
   }
 }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
index bc2c575..64bf899 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
@@ -56,12 +56,12 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
 
   test("datetime function current_date") {
     val d0 = DateTimeUtils.millisToDays(System.currentTimeMillis(), TimeZoneGMT)
-    val cd = CurrentDate().eval(EmptyRow).asInstanceOf[Int]
+    val cd = CurrentDate(gmtId).eval(EmptyRow).asInstanceOf[Int]
     val d1 = DateTimeUtils.millisToDays(System.currentTimeMillis(), TimeZoneGMT)
     assert(d0 <= cd && cd <= d1 && d1 - d0 <= 1)
 
-    val cdjst = CurrentDate().eval(EmptyRow).asInstanceOf[Int]
-    val cdpst = CurrentDate().eval(EmptyRow).asInstanceOf[Int]
+    val cdjst = CurrentDate(jstId).eval(EmptyRow).asInstanceOf[Int]
+    val cdpst = CurrentDate(pstId).eval(EmptyRow).asInstanceOf[Int]
     assert(cdpst <= cd && cd <= cdjst)
   }
 
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala
index c36b8da..10ed4e4 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala
@@ -17,13 +17,12 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
-import java.util.concurrent.TimeUnit.MILLISECONDS
-
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate, CurrentTimestamp, Literal}
 import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 
 class ComputeCurrentTimeSuite extends PlanTest {
   object Optimize extends RuleExecutor[LogicalPlan] {
@@ -52,9 +51,9 @@ class ComputeCurrentTimeSuite extends PlanTest {
   test("analyzer should replace current_date with literals") {
     val in = Project(Seq(Alias(CurrentDate(), "a")(), Alias(CurrentDate(), "b")()), LocalRelation())
 
-    val min = MILLISECONDS.toDays(System.currentTimeMillis())
+    val min = DateTimeUtils.millisToDays(System.currentTimeMillis())
     val plan = Optimize.execute(in.analyze).asInstanceOf[Project]
-    val max = MILLISECONDS.toDays(System.currentTimeMillis())
+    val max = DateTimeUtils.millisToDays(System.currentTimeMillis())
 
     val lits = new scala.collection.mutable.ArrayBuffer[Int]
     plan.transformAllExpressions { case e: Literal =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 59a4afb..fdd80cc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -514,7 +514,7 @@ class MicroBatchExecution(
           ct.dataType, Some("Dummy TimeZoneId"))
       case cd: CurrentDate =>
         CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
-          cd.dataType, Some("UTC"))
+          cd.dataType, cd.timeZoneId)
     }
 
     val triggerLogicalPlan = sink match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index d0be216..bcb5783 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -2572,7 +2572,7 @@ object functions {
   }
 
   /**
-   * Returns the current date in the UTC time zone as a date column.
+   * Returns the current date as a date column.
    *
    * @group datetime_funcs
    * @since 1.5.0
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index d7cd15f..73259a0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -720,7 +720,7 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext {
     assert(testData.groupBy(col("key")).toString.contains(
       "[grouping expressions: [key], value: [key: int, value: string], type: GroupBy]"))
     assert(testData.groupBy(current_date()).toString.contains(
-      "grouping expressions: [current_date()], value: [key: int, value: string], " +
+      "grouping expressions: [current_date(None)], value: [key: int, value: string], " +
         "type: GroupBy]"))
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
index 5ad1cb3..29cef69 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
@@ -19,11 +19,10 @@ package org.apache.spark.sql
 
 import java.sql.{Date, Timestamp}
 import java.text.SimpleDateFormat
-import java.time.LocalDate
 import java.util.Locale
 import java.util.concurrent.TimeUnit
 
-import org.apache.spark.sql.catalyst.util.DateTimeUtils._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
@@ -33,14 +32,13 @@ class DateFunctionsSuite extends QueryTest with SharedSQLContext {
   import testImplicits._
 
   test("function current_date") {
-    withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true") {
-      val df1 = Seq((1, 2), (3, 1)).toDF("a", "b")
-      val d0 = System.currentTimeMillis() / MILLIS_PER_DAY
-      val d1 = localDateToDays(df1.select(current_date()).collect().head.getAs[LocalDate](0))
-      val d2 = localDateToDays(sql("""SELECT CURRENT_DATE()""").collect().head.getAs[LocalDate](0))
-      val d3 = System.currentTimeMillis() / MILLIS_PER_DAY
-      assert(d0 <= d1 && d1 <= d2 && d2 <= d3 && d3 - d0 <= 1)
-    }
+    val df1 = Seq((1, 2), (3, 1)).toDF("a", "b")
+    val d0 = DateTimeUtils.millisToDays(System.currentTimeMillis())
+    val d1 = DateTimeUtils.fromJavaDate(df1.select(current_date()).collect().head.getDate(0))
+    val d2 = DateTimeUtils.fromJavaDate(
+      sql("""SELECT CURRENT_DATE()""").collect().head.getDate(0))
+    val d3 = DateTimeUtils.millisToDays(System.currentTimeMillis())
+    assert(d0 <= d1 && d1 <= d2 && d2 <= d3 && d3 - d0 <= 1)
   }
 
   test("function current_timestamp and now") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org