You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/07/29 18:00:33 UTC

[spark] branch master updated: [SPARK-28459][SQL] Add `make_timestamp` function

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new caa23e3  [SPARK-28459][SQL] Add `make_timestamp` function
caa23e3 is described below

commit caa23e3efd3f4422e8f599b30bec3ef1fb33c03c
Author: Maxim Gekk <ma...@gmail.com>
AuthorDate: Mon Jul 29 11:00:08 2019 -0700

    [SPARK-28459][SQL] Add `make_timestamp` function
    
    ## What changes were proposed in this pull request?
    
    New function `make_timestamp()` takes 6 columns `year`, `month`, `day`, `hour`, `min`, `sec` + optionally `timezone`, and makes new column of the `TIMESTAMP` type. If values in the input columns are `null` or out of valid ranges, the function returns `null`. Valid ranges are:
    - `year` - `[1, 9999]`
    - `month` - `[1, 12]`
    - `day` - `[1, 31]`
    - `hour` - `[0, 23]`
    - `min` - `[0, 59]`
    - `sec` - `[0, 60]`. If the `sec` argument equals to 60, the seconds field is set to 0 and 1 minute is added to the final timestamp.
    - `timezone` - an identifier of timezone. Actual database of timezones can be found there: https://www.iana.org/time-zones.
    
    Also constructed timestamp must be valid otherwise `make_timestamp` returns `null`.
    
    The function is implemented similarly to `make_timestamp` in PostgreSQL: https://www.postgresql.org/docs/11/functions-datetime.html to maintain feature parity with it.
    
    Here is an example:
    ```sql
    select make_timestamp(2014, 12, 28, 6, 30, 45.887);
      2014-12-28 06:30:45.887
    select make_timestamp(2014, 12, 28, 6, 30, 45.887, 'CET');
      2014-12-28 10:30:45.887
    select make_timestamp(2019, 6, 30, 23, 59, 60)
      2019-07-01 00:00:00
    ```
    
    Returned value has Spark Catalyst type `TIMESTAMP` which is similar to Oracle's `TIMESTAMP WITH LOCAL TIME ZONE` (see https://docs.oracle.com/cd/B28359_01/server.111/b28298/ch4datetime.htm#i1006169) where data is stored in the session time zone, and the time zone offset is not stored as part of the column data. When users retrieve the data, Spark returns it in the session time zone specified by the SQL config `spark.sql.session.timeZone`.
    
    ## How was this patch tested?
    
    Added new tests to `DateExpressionsSuite`, and uncommented a test for `make_timestamp` in `pgSQL/timestamp.sql`.
    
    Closes #25220 from MaxGekk/make_timestamp.
    
    Authored-by: Maxim Gekk <ma...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../sql/catalyst/analysis/FunctionRegistry.scala   |   1 +
 .../sql/catalyst/expressions/Expression.scala      | 151 ++++++++++++++++++++
 .../catalyst/expressions/datetimeExpressions.scala | 155 ++++++++++++++++++++-
 .../expressions/DateExpressionsSuite.scala         |  34 ++++-
 .../resources/sql-tests/inputs/pgSQL/timestamp.sql |   3 +-
 .../sql-tests/results/pgSQL/timestamp.sql.out      |  14 +-
 6 files changed, 351 insertions(+), 7 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 7c88fd5..1aa5ec7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -416,6 +416,7 @@ object FunctionRegistry {
     expression[Year]("year"),
     expression[TimeWindow]("window"),
     expression[MakeDate]("make_date"),
+    expression[MakeTimestamp]("make_timestamp"),
 
     // collection functions
     expression[CreateArray]("array"),
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
index b793d81..64823a9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
@@ -864,6 +864,157 @@ abstract class QuaternaryExpression extends Expression {
 }
 
 /**
+ * An expression with six inputs + 7th optional input and one output.
+ * The output is by default evaluated to null if any input is evaluated to null.
+ */
+abstract class SeptenaryExpression extends Expression {
+
+  override def foldable: Boolean = children.forall(_.foldable)
+
+  override def nullable: Boolean = children.exists(_.nullable)
+
+  /**
+   * Default behavior of evaluation according to the default nullability of SeptenaryExpression.
+   * If subclass of SeptenaryExpression override nullable, probably should also override this.
+   */
+  override def eval(input: InternalRow): Any = {
+    val exprs = children
+    val v1 = exprs(0).eval(input)
+    if (v1 != null) {
+      val v2 = exprs(1).eval(input)
+      if (v2 != null) {
+        val v3 = exprs(2).eval(input)
+        if (v3 != null) {
+          val v4 = exprs(3).eval(input)
+          if (v4 != null) {
+            val v5 = exprs(4).eval(input)
+            if (v5 != null) {
+              val v6 = exprs(5).eval(input)
+              if (v6 != null) {
+                if (exprs.length > 6) {
+                  val v7 = exprs(6).eval(input)
+                  if (v7 != null) {
+                    return nullSafeEval(v1, v2, v3, v4, v5, v6, Some(v7))
+                  }
+                } else {
+                  return nullSafeEval(v1, v2, v3, v4, v5, v6, None)
+                }
+              }
+            }
+          }
+        }
+      }
+    }
+    null
+  }
+
+  /**
+   * Called by default [[eval]] implementation.  If subclass of SeptenaryExpression keep the
+   * default nullability, they can override this method to save null-check code.  If we need
+   * full control of evaluation process, we should override [[eval]].
+   */
+  protected def nullSafeEval(
+      input1: Any,
+      input2: Any,
+      input3: Any,
+      input4: Any,
+      input5: Any,
+      input6: Any,
+      input7: Option[Any]): Any = {
+    sys.error("SeptenaryExpression must override either eval or nullSafeEval")
+  }
+
+  /**
+   * Short hand for generating septenary evaluation code.
+   * If either of the sub-expressions is null, the result of this computation
+   * is assumed to be null.
+   *
+   * @param f accepts seven variable names and returns Java code to compute the output.
+   */
+  protected def defineCodeGen(
+      ctx: CodegenContext,
+      ev: ExprCode,
+      f: (String, String, String, String, String, String, Option[String]) => String
+    ): ExprCode = {
+    nullSafeCodeGen(ctx, ev, (eval1, eval2, eval3, eval4, eval5, eval6, eval7) => {
+      s"${ev.value} = ${f(eval1, eval2, eval3, eval4, eval5, eval6, eval7)};"
+    })
+  }
+
+  /**
+   * Short hand for generating septenary evaluation code.
+   * If either of the sub-expressions is null, the result of this computation
+   * is assumed to be null.
+   *
+   * @param f function that accepts the 7 non-null evaluation result names of children
+   *          and returns Java code to compute the output.
+   */
+  protected def nullSafeCodeGen(
+      ctx: CodegenContext,
+      ev: ExprCode,
+      f: (String, String, String, String, String, String, Option[String]) => String
+    ): ExprCode = {
+    val firstGen = children(0).genCode(ctx)
+    val secondGen = children(1).genCode(ctx)
+    val thirdGen = children(2).genCode(ctx)
+    val fourthGen = children(3).genCode(ctx)
+    val fifthGen = children(4).genCode(ctx)
+    val sixthGen = children(5).genCode(ctx)
+    val seventhGen = if (children.length > 6) Some(children(6).genCode(ctx)) else None
+    val resultCode = f(
+      firstGen.value,
+      secondGen.value,
+      thirdGen.value,
+      fourthGen.value,
+      fifthGen.value,
+      sixthGen.value,
+      seventhGen.map(_.value))
+
+    if (nullable) {
+      val nullSafeEval =
+        firstGen.code + ctx.nullSafeExec(children(0).nullable, firstGen.isNull) {
+          secondGen.code + ctx.nullSafeExec(children(1).nullable, secondGen.isNull) {
+            thirdGen.code + ctx.nullSafeExec(children(2).nullable, thirdGen.isNull) {
+              fourthGen.code + ctx.nullSafeExec(children(3).nullable, fourthGen.isNull) {
+                fifthGen.code + ctx.nullSafeExec(children(4).nullable, fifthGen.isNull) {
+                  sixthGen.code + ctx.nullSafeExec(children(5).nullable, sixthGen.isNull) {
+                    val nullSafeResultCode =
+                      s"""
+                      ${ev.isNull} = false; // resultCode could change nullability.
+                      $resultCode
+                      """
+                    seventhGen.map { gen =>
+                      gen.code + ctx.nullSafeExec(children(6).nullable, gen.isNull) {
+                        nullSafeResultCode
+                      }
+                    }.getOrElse(nullSafeResultCode)
+                  }
+                }
+              }
+            }
+          }
+        }
+
+      ev.copy(code = code"""
+        boolean ${ev.isNull} = true;
+        ${CodeGenerator.javaType(dataType)} ${ev.value} = ${CodeGenerator.defaultValue(dataType)};
+        $nullSafeEval""")
+    } else {
+      ev.copy(code = code"""
+        ${firstGen.code}
+        ${secondGen.code}
+        ${thirdGen.code}
+        ${fourthGen.code}
+        ${fifthGen.code}
+        ${sixthGen.code}
+        ${seventhGen.map(_.code).getOrElse("")}
+        ${CodeGenerator.javaType(dataType)} ${ev.value} = ${CodeGenerator.defaultValue(dataType)};
+        $resultCode""", isNull = FalseLiteral)
+    }
+  }
+}
+
+/**
  * A trait used for resolving nullable flags, including `nullable`, `containsNull` of [[ArrayType]]
  * and `valueContainsNull` of [[MapType]], containsNull, valueContainsNull flags of the output date
  * type. This is usually utilized by the expressions (e.g. [[CaseWhen]]) that combine data from
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
index 564895b..e50abeb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.catalyst.expressions
 
 import java.sql.Timestamp
-import java.time.{Instant, LocalDate, ZoneId}
+import java.time.{DateTimeException, Instant, LocalDate, LocalDateTime, ZoneId}
 import java.time.temporal.IsoFields
 import java.util.{Locale, TimeZone}
 
@@ -1657,3 +1657,156 @@ case class MakeDate(year: Expression, month: Expression, day: Expression)
 
   override def prettyName: String = "make_date"
 }
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(year, month, day, hour, min, sec[, timezone]) - Create timestamp from year, month, day, hour, min, sec and timezone fields.",
+  arguments = """
+    Arguments:
+      * year - the year to represent, from 1 to 9999
+      * month - the month-of-year to represent, from 1 (January) to 12 (December)
+      * day - the day-of-month to represent, from 1 to 31
+      * hour - the hour-of-day to represent, from 0 to 23
+      * min - the minute-of-hour to represent, from 0 to 59
+      * sec - the second-of-minute and its micro-fraction to represent, from
+              0 to 60. If the sec argument equals to 60, the seconds field is set
+              to 0 and 1 minute is added to the final timestamp.
+      * timezone - the time zone identifier. For example, CET, UTC and etc.
+  """,
+  examples = """
+    Examples:
+      > SELECT _FUNC_(2014, 12, 28, 6, 30, 45.887);
+       2014-12-28 06:30:45.887
+      > SELECT _FUNC_(2014, 12, 28, 6, 30, 45.887, 'CET');
+       2014-12-28 10:30:45.887
+      > SELECT _FUNC_(2019, 6, 30, 23, 59, 60)
+       2019-07-01 00:00:00
+      > SELECT _FUNC_(2019, 13, 1, 10, 11, 12, 13);
+       NULL
+      > SELECT _FUNC_(null, 7, 22, 15, 30, 0);
+       NULL
+  """,
+  since = "3.0.0")
+// scalastyle:on line.size.limit
+case class MakeTimestamp(
+    year: Expression,
+    month: Expression,
+    day: Expression,
+    hour: Expression,
+    min: Expression,
+    sec: Expression,
+    timezone: Option[Expression] = None,
+    timeZoneId: Option[String] = None)
+  extends SeptenaryExpression with TimeZoneAwareExpression with ImplicitCastInputTypes {
+
+  def this(
+      year: Expression,
+      month: Expression,
+      day: Expression,
+      hour: Expression,
+      min: Expression,
+      sec: Expression) = {
+    this(year, month, day, hour, min, sec, None, None)
+  }
+
+  def this(
+      year: Expression,
+      month: Expression,
+      day: Expression,
+      hour: Expression,
+      min: Expression,
+      sec: Expression,
+      timezone: Expression) = {
+    this(year, month, day, hour, min, sec, Some(timezone), None)
+  }
+
+  override def children: Seq[Expression] = Seq(year, month, day, hour, min, sec) ++ timezone
+  override def inputTypes: Seq[AbstractDataType] =
+    Seq(IntegerType, IntegerType, IntegerType, IntegerType, IntegerType, DoubleType) ++
+    timezone.map(_ => StringType)
+  override def dataType: DataType = TimestampType
+  override def nullable: Boolean = true
+
+  override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
+    copy(timeZoneId = Option(timeZoneId))
+
+  private def toMicros(
+      year: Int,
+      month: Int,
+      day: Int,
+      hour: Int,
+      min: Int,
+      secAndNanos: Double,
+      zoneId: ZoneId): Any = {
+    try {
+      val seconds = secAndNanos.toInt
+      val nanos = ((secAndNanos - seconds) * NANOS_PER_SECOND).toInt
+      val ldt = if (seconds == 60) {
+        if (nanos == 0) {
+          // This case of sec = 60 and nanos = 0 is supported for compatibility with PostgreSQL
+          LocalDateTime.of(year, month, day, hour, min, 0, 0).plusMinutes(1)
+        } else {
+          throw new DateTimeException("The fraction of sec must be zero. Valid range is [0, 60].")
+        }
+      } else {
+        LocalDateTime.of(year, month, day, hour, min, seconds, nanos)
+      }
+      instantToMicros(ldt.atZone(zoneId).toInstant)
+    } catch {
+      case _: DateTimeException => null
+    }
+  }
+
+  override def nullSafeEval(
+      year: Any,
+      month: Any,
+      day: Any,
+      hour: Any,
+      min: Any,
+      sec: Any,
+      timezone: Option[Any]): Any = {
+    val zid = timezone
+      .map(tz => DateTimeUtils.getZoneId(tz.asInstanceOf[UTF8String].toString))
+      .getOrElse(zoneId)
+    toMicros(
+      year.asInstanceOf[Int],
+      month.asInstanceOf[Int],
+      day.asInstanceOf[Int],
+      hour.asInstanceOf[Int],
+      min.asInstanceOf[Int],
+      sec.asInstanceOf[Double],
+      zid)
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+    val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
+    val zid = ctx.addReferenceObj("zoneId", zoneId, classOf[ZoneId].getName)
+    nullSafeCodeGen(ctx, ev, (year, month, day, hour, min, secAndNanos, timezone) => {
+      val zoneId = timezone.map(tz => s"$dtu.getZoneId(${tz}.toString())").getOrElse(zid)
+      s"""
+      try {
+        int seconds = (int)$secAndNanos;
+        int nanos = (int)(($secAndNanos - seconds) * 1000000000L);
+        java.time.LocalDateTime ldt;
+        if (seconds == 60) {
+          if (nanos == 0) {
+            ldt = java.time.LocalDateTime.of(
+              $year, $month, $day, $hour, $min, 0, 0).plusMinutes(1);
+          } else {
+            throw new java.time.DateTimeException(
+              "The fraction of sec must be zero. Valid range is [0, 60].");
+          }
+        } else {
+          ldt = java.time.LocalDateTime.of(
+            $year, $month, $day, $hour, $min, seconds, nanos);
+        }
+        java.time.Instant instant = ldt.atZone($zoneId).toInstant();
+        ${ev.value} = $dtu.instantToMicros(instant);
+      } catch (java.time.DateTimeException e) {
+        ${ev.isNull} = true;
+      }"""
+    })
+  }
+
+  override def prettyName: String = "make_timestamp"
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
index b4110af..30e10c5 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions
 
 import java.sql.{Date, Timestamp}
 import java.text.SimpleDateFormat
-import java.time.ZoneOffset
+import java.time.{ZoneId, ZoneOffset}
 import java.util.{Calendar, Locale, TimeZone}
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.TimeUnit._
@@ -928,4 +928,36 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
     checkEvaluation(MakeDate(Literal(2019), Literal(13), Literal(19)), null)
     checkEvaluation(MakeDate(Literal(2019), Literal(7), Literal(32)), null)
   }
+
+  test("creating values of TimestampType via make_timestamp") {
+    var makeTimestampExpr = MakeTimestamp(
+      Literal(2013), Literal(7), Literal(15), Literal(8), Literal(15), Literal(23.5),
+      Some(Literal(ZoneId.systemDefault().getId)))
+    val expected = Timestamp.valueOf("2013-7-15 8:15:23.5")
+    checkEvaluation(makeTimestampExpr, expected)
+    checkEvaluation(makeTimestampExpr.copy(timezone = None), expected)
+
+    checkEvaluation(makeTimestampExpr.copy(year = Literal.create(null, IntegerType)), null)
+    checkEvaluation(makeTimestampExpr.copy(year = Literal(Int.MaxValue)), null)
+
+    checkEvaluation(makeTimestampExpr.copy(month = Literal.create(null, IntegerType)), null)
+    checkEvaluation(makeTimestampExpr.copy(month = Literal(13)), null)
+
+    checkEvaluation(makeTimestampExpr.copy(day = Literal.create(null, IntegerType)), null)
+    checkEvaluation(makeTimestampExpr.copy(day = Literal(32)), null)
+
+    checkEvaluation(makeTimestampExpr.copy(hour = Literal.create(null, IntegerType)), null)
+    checkEvaluation(makeTimestampExpr.copy(hour = Literal(25)), null)
+
+    checkEvaluation(makeTimestampExpr.copy(min = Literal.create(null, IntegerType)), null)
+    checkEvaluation(makeTimestampExpr.copy(min = Literal(65)), null)
+
+    checkEvaluation(makeTimestampExpr.copy(sec = Literal.create(null, DoubleType)), null)
+    checkEvaluation(makeTimestampExpr.copy(sec = Literal(70.0)), null)
+
+    makeTimestampExpr = MakeTimestamp(Literal(2019), Literal(6), Literal(30),
+      Literal(23), Literal(59), Literal(60.0))
+    checkEvaluation(makeTimestampExpr, Timestamp.valueOf("2019-07-01 00:00:00"))
+    checkEvaluation(makeTimestampExpr.copy(sec = Literal(60.5)), null)
+  }
 }
diff --git a/sql/core/src/test/resources/sql-tests/inputs/pgSQL/timestamp.sql b/sql/core/src/test/resources/sql-tests/inputs/pgSQL/timestamp.sql
index 02af15a..2b974816 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/pgSQL/timestamp.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/pgSQL/timestamp.sql
@@ -240,8 +240,7 @@ SELECT '' AS date_trunc_week, date_trunc( 'week', timestamp '2004-02-29 15:44:17
 --    FROM TIMESTAMP_TBL;
 
 
---[SPARK-28432] Missing Date/Time Functions: make_timestamp
 -- timestamp numeric fields constructor
--- SELECT make_timestamp(2014,12,28,6,30,45.887);
+SELECT make_timestamp(2014,12,28,6,30,45.887);
 
 DROP TABLE TIMESTAMP_TBL;
diff --git a/sql/core/src/test/resources/sql-tests/results/pgSQL/timestamp.sql.out b/sql/core/src/test/resources/sql-tests/results/pgSQL/timestamp.sql.out
index 200fecc..75b63ac 100644
--- a/sql/core/src/test/resources/sql-tests/results/pgSQL/timestamp.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/pgSQL/timestamp.sql.out
@@ -1,5 +1,5 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 14
+-- Number of queries: 15
 
 
 -- !query 0
@@ -123,8 +123,16 @@ struct<date_trunc_week:string,week_trunc:timestamp>
 
 
 -- !query 13
-DROP TABLE TIMESTAMP_TBL
+SELECT make_timestamp(2014,12,28,6,30,45.887)
 -- !query 13 schema
-struct<>
+struct<make_timestamp(2014, 12, 28, 6, 30, CAST(45.887 AS DOUBLE)):timestamp>
 -- !query 13 output
+2014-12-28 06:30:45.887
+
+
+-- !query 14
+DROP TABLE TIMESTAMP_TBL
+-- !query 14 schema
+struct<>
+-- !query 14 output
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org