You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/08/03 09:42:22 UTC

[flink] branch master updated: [FLINK-6846] [table] Add timestamp addition in Table API

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a369a0  [FLINK-6846] [table] Add timestamp addition in Table API
4a369a0 is described below

commit 4a369a0362a7e626e7bf746ed048153e010d92c7
Author: xueyu <27...@qq.com>
AuthorDate: Wed Jun 20 21:30:02 2018 +0800

    [FLINK-6846] [table] Add timestamp addition in Table API
    
    It adds all temporal intervals known to SQL to the Table API
    for timestamp/interval arithmetic.
    
    Replaces the deprecated "quarter()" function.
    
    This closes #6188.
---
 docs/dev/table/tableApi.md                         |   4 +-
 .../flink/table/api/scala/expressionDsl.scala      |  69 ++++++----
 .../flink/table/expressions/ExpressionParser.scala |  12 +-
 .../apache/flink/table/expressions/symbols.scala   |   3 +-
 .../org/apache/flink/table/expressions/time.scala  |   3 +-
 .../table/expressions/ScalarFunctionsTest.scala    | 142 +++++++++++++--------
 6 files changed, 148 insertions(+), 85 deletions(-)

diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index bd9d286..9317122 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -1613,7 +1613,7 @@ suffixed = interval | cast | as | if | functionCall ;
 
 interval = timeInterval | rowInterval ;
 
-timeInterval = composite , "." , ("year" | "years" | "month" | "months" | "day" | "days" | "hour" | "hours" | "minute" | "minutes" | "second" | "seconds" | "milli" | "millis") ;
+timeInterval = composite , "." , ("year" | "years" | "quarter" | "quarters" | "month" | "months" | "week" | "weeks" | "day" | "days" | "hour" | "hours" | "minute" | "minutes" | "second" | "seconds" | "milli" | "millis") ;
 
 rowInterval = composite , "." , "rows" ;
 
@@ -1633,7 +1633,7 @@ fieldReference = "*" | identifier ;
 
 nullLiteral = "Null(" , dataType , ")" ;
 
-timeIntervalUnit = "YEAR" | "YEAR_TO_MONTH" | "MONTH" | "DAY" | "DAY_TO_HOUR" | "DAY_TO_MINUTE" | "DAY_TO_SECOND" | "HOUR" | "HOUR_TO_MINUTE" | "HOUR_TO_SECOND" | "MINUTE" | "MINUTE_TO_SECOND" | "SECOND" ;
+timeIntervalUnit = "YEAR" | "YEAR_TO_MONTH" | "MONTH" | "QUARTER" | "WEEK" | "DAY" | "DAY_TO_HOUR" | "DAY_TO_MINUTE" | "DAY_TO_SECOND" | "HOUR" | "HOUR_TO_MINUTE" | "HOUR_TO_SECOND" | "MINUTE" | "MINUTE_TO_SECOND" | "SECOND" ;
 
 timePointUnit = "YEAR" | "MONTH" | "DAY" | "HOUR" | "MINUTE" | "SECOND" | "QUARTER" | "WEEK" | "MILLISECOND" | "MICROSECOND" ;
 
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index 91d72ce..0989fcd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -574,17 +574,6 @@ trait ImplicitExpressionOperations {
   def extract(timeIntervalUnit: TimeIntervalUnit) = Extract(timeIntervalUnit, expr)
 
   /**
-    * Returns the quarter of a year from a SQL date.
-    *
-    * e.g. "1994-09-27".toDate.quarter() leads to 3
-    *
-    * @deprecated This method will be used for describing an interval of months in future versions.
-    *             Use `extract(TimeIntervalUnit.QUARTER)` instead.
-    */
-  @deprecated
-  def quarter() = Quarter(expr)
-
-  /**
     * Rounds down a time point to the given unit.
     *
     * e.g. "12:44:31".toDate.floor(MINUTE) leads to 12:44:00
@@ -605,98 +594,126 @@ trait ImplicitExpressionOperations {
     *
     * @return interval of months
     */
-  def year = toMonthInterval(expr, 12)
+  def year: Expression = toMonthInterval(expr, 12)
 
   /**
     * Creates an interval of the given number of years.
     *
     * @return interval of months
     */
-  def years = year
+  def years: Expression = year
+
+  /**
+   * Creates an interval of the given number of quarters.
+   *
+   * @return interval of months
+   */
+  def quarter: Expression = toMonthInterval(expr, 3)
+
+  /**
+   * Creates an interval of the given number of quarters.
+   *
+   * @return interval of months
+   */
+  def quarters: Expression = quarter
 
   /**
     * Creates an interval of the given number of months.
     *
     * @return interval of months
     */
-  def month = toMonthInterval(expr, 1)
+  def month: Expression = toMonthInterval(expr, 1)
 
   /**
     * Creates an interval of the given number of months.
     *
     * @return interval of months
     */
-  def months = month
+  def months: Expression = month
+
+  /**
+    * Creates an interval of the given number of weeks.
+    *
+    * @return interval of milliseconds
+    */
+  def week: Expression = toMilliInterval(expr, 7 * MILLIS_PER_DAY)
+
+  /**
+    * Creates an interval of the given number of weeks.
+    *
+    * @return interval of milliseconds
+    */
+  def weeks: Expression = week
 
   /**
     * Creates an interval of the given number of days.
     *
     * @return interval of milliseconds
     */
-  def day = toMilliInterval(expr, MILLIS_PER_DAY)
+  def day: Expression = toMilliInterval(expr, MILLIS_PER_DAY)
 
   /**
     * Creates an interval of the given number of days.
     *
     * @return interval of milliseconds
     */
-  def days = day
+  def days: Expression = day
 
   /**
     * Creates an interval of the given number of hours.
     *
     * @return interval of milliseconds
     */
-  def hour = toMilliInterval(expr, MILLIS_PER_HOUR)
+  def hour: Expression = toMilliInterval(expr, MILLIS_PER_HOUR)
 
   /**
     * Creates an interval of the given number of hours.
     *
     * @return interval of milliseconds
     */
-  def hours = hour
+  def hours: Expression = hour
 
   /**
     * Creates an interval of the given number of minutes.
     *
     * @return interval of milliseconds
     */
-  def minute = toMilliInterval(expr, MILLIS_PER_MINUTE)
+  def minute: Expression = toMilliInterval(expr, MILLIS_PER_MINUTE)
 
   /**
     * Creates an interval of the given number of minutes.
     *
     * @return interval of milliseconds
     */
-  def minutes = minute
+  def minutes: Expression = minute
 
   /**
     * Creates an interval of the given number of seconds.
     *
     * @return interval of milliseconds
     */
-  def second = toMilliInterval(expr, MILLIS_PER_SECOND)
+  def second: Expression = toMilliInterval(expr, MILLIS_PER_SECOND)
 
   /**
     * Creates an interval of the given number of seconds.
     *
     * @return interval of milliseconds
     */
-  def seconds = second
+  def seconds: Expression = second
 
   /**
     * Creates an interval of the given number of milliseconds.
     *
     * @return interval of milliseconds
     */
-  def milli = toMilliInterval(expr, 1)
+  def milli: Expression = toMilliInterval(expr, 1)
 
   /**
     * Creates an interval of the given number of milliseconds.
     *
     * @return interval of milliseconds
     */
-  def millis = milli
+  def millis: Expression = milli
 
   // Row interval type
 
@@ -705,7 +722,7 @@ trait ImplicitExpressionOperations {
     *
     * @return interval of rows
     */
-  def rows = toRowInterval(expr)
+  def rows: Expression = toRowInterval(expr)
 
   // Advanced type helper functions
 
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
index faf6268..4b2440c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
@@ -60,8 +60,12 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
   lazy val LOG: Keyword = Keyword("log")
   lazy val YEARS: Keyword = Keyword("years")
   lazy val YEAR: Keyword = Keyword("year")
+  lazy val QUARTERS: Keyword = Keyword("quarters")
+  lazy val QUARTER: Keyword = Keyword("quarter")
   lazy val MONTHS: Keyword = Keyword("months")
   lazy val MONTH: Keyword = Keyword("month")
+  lazy val WEEKS: Keyword = Keyword("weeks")
+  lazy val WEEK: Keyword = Keyword("week")
   lazy val DAYS: Keyword = Keyword("days")
   lazy val DAY: Keyword = Keyword("day")
   lazy val HOURS: Keyword = Keyword("hours")
@@ -273,13 +277,17 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
     composite <~ "." ~ TO_TIME ~ opt("()") ^^ { e => Cast(e, SqlTimeTypeInfo.TIME) }
 
   lazy val suffixTimeInterval : PackratParser[Expression] =
-    composite ~ "." ~ (YEARS | MONTHS | DAYS | HOURS | MINUTES | SECONDS | MILLIS |
-      YEAR | MONTH | DAY | HOUR | MINUTE | SECOND | MILLI) ^^ {
+    composite ~ "." ~ (YEARS | QUARTERS | MONTHS | WEEKS | DAYS |  HOURS | MINUTES |
+      SECONDS | MILLIS | YEAR | QUARTER | MONTH | WEEK | DAY | HOUR | MINUTE | SECOND | MILLI) ^^ {
 
     case expr ~ _ ~ (YEARS.key | YEAR.key) => toMonthInterval(expr, 12)
 
+    case expr ~ _ ~ (QUARTERS.key | QUARTER.key) => toMonthInterval(expr, 3)
+
     case expr ~ _ ~ (MONTHS.key | MONTH.key) => toMonthInterval(expr, 1)
 
+    case expr ~ _ ~ (WEEKS.key | WEEKS.key) => toMilliInterval(expr, 7 * MILLIS_PER_DAY)
+
     case expr ~ _ ~ (DAYS.key | DAY.key) => toMilliInterval(expr, MILLIS_PER_DAY)
 
     case expr ~ _ ~ (HOURS.key | HOUR.key) => toMilliInterval(expr, MILLIS_PER_HOUR)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/symbols.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/symbols.scala
index ec127e2..78ad3e2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/symbols.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/symbols.scala
@@ -84,8 +84,9 @@ object TimeIntervalUnit extends TableSymbols {
 
   val YEAR = Value(TimeUnitRange.YEAR)
   val YEAR_TO_MONTH = Value(TimeUnitRange.YEAR_TO_MONTH)
-  val MONTH = Value(TimeUnitRange.MONTH)
   val QUARTER = Value(TimeUnitRange.QUARTER)
+  val MONTH = Value(TimeUnitRange.MONTH)
+  val WEEK = Value(TimeUnitRange.WEEK)
   val DAY = Value(TimeUnitRange.DAY)
   val DAY_TO_HOUR = Value(TimeUnitRange.DAY_TO_HOUR)
   val DAY_TO_MINUTE = Value(TimeUnitRange.DAY_TO_MINUTE)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
index 5dff774..ac996f6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
@@ -47,8 +47,9 @@ case class Extract(timeIntervalUnit: Expression, temporal: Expression) extends E
 
     timeIntervalUnit match {
       case SymbolExpression(TimeIntervalUnit.YEAR)
-           | SymbolExpression(TimeIntervalUnit.MONTH)
            | SymbolExpression(TimeIntervalUnit.QUARTER)
+           | SymbolExpression(TimeIntervalUnit.MONTH)
+           | SymbolExpression(TimeIntervalUnit.WEEK)
            | SymbolExpression(TimeIntervalUnit.DAY)
         if temporal.resultType == SqlTimeTypeInfo.DATE
           || temporal.resultType == SqlTimeTypeInfo.TIMESTAMP
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index 453de19..d1ff806 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -1443,12 +1443,24 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
       "1996")
 
     testAllApis(
+      'f16.extract(TimeIntervalUnit.QUARTER),
+      "f16.extract(QUARTER)",
+      "EXTRACT(QUARTER FROM f16)",
+      "4")
+
+    testAllApis(
       'f16.extract(TimeIntervalUnit.MONTH),
       "extract(f16, MONTH)",
       "EXTRACT(MONTH FROM f16)",
       "11")
 
     testAllApis(
+      'f16.extract(TimeIntervalUnit.WEEK),
+      "extract(f16, WEEK)",
+      "EXTRACT(WEEK FROM f16)",
+      "45")
+
+    testAllApis(
       'f16.extract(TimeIntervalUnit.DAY),
       "f16.extract(DAY)",
       "EXTRACT(DAY FROM f16)",
@@ -1461,12 +1473,30 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
       "1996")
 
     testAllApis(
+      'f18.extract(TimeIntervalUnit.QUARTER),
+      "f18.extract(QUARTER)",
+      "EXTRACT(QUARTER FROM f18)",
+      "4")
+
+    testAllApis(
+      'f16.extract(TimeIntervalUnit.QUARTER),
+      "f16.extract(QUARTER)",
+      "EXTRACT(QUARTER FROM f16)",
+      "4")
+
+    testAllApis(
       'f18.extract(TimeIntervalUnit.MONTH),
       "f18.extract(MONTH)",
       "EXTRACT(MONTH FROM f18)",
       "11")
 
     testAllApis(
+      'f18.extract(TimeIntervalUnit.WEEK),
+      "f18.extract(WEEK)",
+      "EXTRACT(WEEK FROM f18)",
+      "45")
+
+    testAllApis(
       'f18.extract(TimeIntervalUnit.DAY),
       "f18.extract(DAY)",
       "EXTRACT(DAY FROM f18)",
@@ -1539,23 +1569,17 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
       "1")
 
     testAllApis(
+      'f20.extract(TimeIntervalUnit.QUARTER),
+      "f20.extract(QUARTER)",
+      "EXTRACT(QUARTER FROM f20)",
+      "1")
+
+    testAllApis(
       'f20.extract(TimeIntervalUnit.YEAR),
       "f20.extract(YEAR)",
       "EXTRACT(YEAR FROM f20)",
       "2")
 
-    testAllApis(
-      'f18.extract(TimeIntervalUnit.QUARTER),
-      "f18.extract(QUARTER)",
-      "EXTRACT(QUARTER FROM f18)",
-      "4")
-
-    testAllApis(
-      'f16.extract(TimeIntervalUnit.QUARTER),
-      "f16.extract(QUARTER)",
-      "EXTRACT(QUARTER FROM f16)",
-      "4")
-
     // test SQL only time units
     testSqlApi(
       "EXTRACT(MILLENNIUM FROM f18)",
@@ -1892,34 +1916,13 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   }
 
   @Test
-  def testQuarter(): Unit = {
-    testAllApis(
-      "1997-01-27".toDate.quarter(),
-      "'1997-01-27'.toDate.quarter()",
-      "QUARTER(DATE '1997-01-27')",
-      "1")
-
-    testAllApis(
-      "1997-04-27".toDate.quarter(),
-      "'1997-04-27'.toDate.quarter()",
-      "QUARTER(DATE '1997-04-27')",
-      "2")
-
-    testAllApis(
-      "1997-12-31".toDate.quarter(),
-      "'1997-12-31'.toDate.quarter()",
-      "QUARTER(DATE '1997-12-31')",
-      "4")
-  }
-
-  @Test
   def testTimestampAdd(): Unit = {
     val data = Seq(
-      (1, "TIMESTAMP '2017-11-29 22:58:58.998'"),
-      (3, "TIMESTAMP '2017-11-29 22:58:58.998'"),
-      (-1, "TIMESTAMP '2017-11-29 22:58:58.998'"),
-      (-61, "TIMESTAMP '2017-11-29 22:58:58.998'"),
-      (-1000, "TIMESTAMP '2017-11-29 22:58:58.998'")
+      (1, "2017-11-29 22:58:58.998"),
+      (3, "2017-11-29 22:58:58.998"),
+      (-1, "2017-11-29 22:58:58.998"),
+      (-61, "2017-11-29 22:58:58.998"),
+      (-1000, "2017-11-29 22:58:58.998")
     )
 
     val YEAR = Seq(
@@ -1998,27 +2001,60 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
       "SQL_TSI_SECOND" -> SECOND
     )
 
-    for ((interval, result) <- intervalMapResults) {
-      testSqlApi(
-        s"TIMESTAMPADD($interval, ${data.head._1}, ${data.head._2})", result.head)
-      testSqlApi(
-        s"TIMESTAMPADD($interval, ${data(1)._1}, ${data(1)._2})", result(1))
-      testSqlApi(
-        s"TIMESTAMPADD($interval, ${data(2)._1}, ${data(2)._2})", result(2))
-      testSqlApi(
-        s"TIMESTAMPADD($interval, ${data(3)._1}, ${data(3)._2})", result(3))
-      testSqlApi(
-        s"TIMESTAMPADD($interval, ${data(4)._1}, ${data(4)._2})", result(4))
+    def intervalCount(interval: String, count: Int): (Expression, String) = interval match {
+      case "YEAR" => (count.years, s"$count.years")
+      case "SQL_TSI_YEAR" => (count.years, s"$count.years")
+      case "QUARTER" => (count.quarters, s"$count.quarters")
+      case "SQL_TSI_QUARTER" => (count.quarters, s"$count.quarters")
+      case "MONTH" => (count.months, s"$count.months")
+      case "SQL_TSI_MONTH" => (count.months, s"$count.months")
+      case "WEEK" => (count.weeks, s"$count.weeks")
+      case "SQL_TSI_WEEK" => (count.weeks, s"$count.weeks")
+      case "DAY" => (count.days, s"$count.days")
+      case "SQL_TSI_DAY" => (count.days, s"$count.days")
+      case "HOUR" => (count.hours, s"$count.hours")
+      case "SQL_TSI_HOUR" => (count.hours, s"$count.hours")
+      case "MINUTE" => (count.minutes, s"$count.minutes")
+      case "SQL_TSI_MINUTE" => (count.minutes, s"$count.minutes")
+      case "SECOND" => (count.seconds, s"$count.seconds")
+      case "SQL_TSI_SECOND" => (count.seconds, s"$count.seconds")
     }
 
-    testSqlApi("TIMESTAMPADD(HOUR, CAST(NULL AS INTEGER), TIMESTAMP '2016-02-24 12:42:25')", "null")
+    for ((interval, result) <- intervalMapResults) {
+        for (i <- 0 to 4) {
+          val (offset, ts) = data(i)
+          val timeInterval = intervalCount(interval, offset)
+          testAllApis(
+            timeInterval._1 + ts.toTimestamp,
+            s"${timeInterval._2} + '$ts'.toTimestamp",
+            s"TIMESTAMPADD($interval, $offset, TIMESTAMP '$ts')",
+            result(i))
+        }
+    }
 
-    testSqlApi("TIMESTAMPADD(HOUR, -200, CAST(NULL AS TIMESTAMP))", "null")
+    testAllApis(
+      "2016-02-24 12:42:25".toTimestamp + Null(Types.INTERVAL_MILLIS),
+      "'2016-02-24 12:42:25'.toTimestamp + Null(INTERVAL_MILLIS)",
+      "TIMESTAMPADD(HOUR, CAST(NULL AS INTEGER), TIMESTAMP '2016-02-24 12:42:25')",
+      "null")
 
-    testSqlApi("TIMESTAMPADD(DAY, 1, DATE '2016-06-15')", "2016-06-16")
+    testAllApis(
+      Null(Types.SQL_TIMESTAMP) + -200.hours,
+      "Null(SQL_TIMESTAMP) + -200.hours",
+      "TIMESTAMPADD(HOUR, -200, CAST(NULL AS TIMESTAMP))",
+      "null")
 
-    testSqlApi("TIMESTAMPADD(MONTH, 3, CAST(NULL AS TIMESTAMP))", "null")
+    testAllApis(
+      "2016-06-15".toDate + 1.day,
+      "'2016-06-15'.toDate + 1.day",
+      "TIMESTAMPADD(DAY, 1, DATE '2016-06-15')",
+      "2016-06-16")
 
+    testAllApis(
+      Null(Types.SQL_TIMESTAMP) + 3.months,
+      "Null(SQL_TIMESTAMP) + 3.months",
+      "TIMESTAMPADD(MONTH, 3, CAST(NULL AS TIMESTAMP))",
+      "null")
   }
 
   // ----------------------------------------------------------------------------------------------