You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/08/03 09:42:22 UTC
[flink] branch master updated: [FLINK-6846] [table] Add timestamp
addition in Table API
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4a369a0 [FLINK-6846] [table] Add timestamp addition in Table API
4a369a0 is described below
commit 4a369a0362a7e626e7bf746ed048153e010d92c7
Author: xueyu <27...@qq.com>
AuthorDate: Wed Jun 20 21:30:02 2018 +0800
[FLINK-6846] [table] Add timestamp addition in Table API
It adds all temporal intervals known to SQL to the Table API
for timestamp/interval arithmetic.
Replaces the deprecated "quarter()" function.
This closes #6188.
---
docs/dev/table/tableApi.md | 4 +-
.../flink/table/api/scala/expressionDsl.scala | 69 ++++++----
.../flink/table/expressions/ExpressionParser.scala | 12 +-
.../apache/flink/table/expressions/symbols.scala | 3 +-
.../org/apache/flink/table/expressions/time.scala | 3 +-
.../table/expressions/ScalarFunctionsTest.scala | 142 +++++++++++++--------
6 files changed, 148 insertions(+), 85 deletions(-)
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index bd9d286..9317122 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -1613,7 +1613,7 @@ suffixed = interval | cast | as | if | functionCall ;
interval = timeInterval | rowInterval ;
-timeInterval = composite , "." , ("year" | "years" | "month" | "months" | "day" | "days" | "hour" | "hours" | "minute" | "minutes" | "second" | "seconds" | "milli" | "millis") ;
+timeInterval = composite , "." , ("year" | "years" | "quarter" | "quarters" | "month" | "months" | "week" | "weeks" | "day" | "days" | "hour" | "hours" | "minute" | "minutes" | "second" | "seconds" | "milli" | "millis") ;
rowInterval = composite , "." , "rows" ;
@@ -1633,7 +1633,7 @@ fieldReference = "*" | identifier ;
nullLiteral = "Null(" , dataType , ")" ;
-timeIntervalUnit = "YEAR" | "YEAR_TO_MONTH" | "MONTH" | "DAY" | "DAY_TO_HOUR" | "DAY_TO_MINUTE" | "DAY_TO_SECOND" | "HOUR" | "HOUR_TO_MINUTE" | "HOUR_TO_SECOND" | "MINUTE" | "MINUTE_TO_SECOND" | "SECOND" ;
+timeIntervalUnit = "YEAR" | "YEAR_TO_MONTH" | "MONTH" | "QUARTER" | "WEEK" | "DAY" | "DAY_TO_HOUR" | "DAY_TO_MINUTE" | "DAY_TO_SECOND" | "HOUR" | "HOUR_TO_MINUTE" | "HOUR_TO_SECOND" | "MINUTE" | "MINUTE_TO_SECOND" | "SECOND" ;
timePointUnit = "YEAR" | "MONTH" | "DAY" | "HOUR" | "MINUTE" | "SECOND" | "QUARTER" | "WEEK" | "MILLISECOND" | "MICROSECOND" ;
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index 91d72ce..0989fcd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -574,17 +574,6 @@ trait ImplicitExpressionOperations {
def extract(timeIntervalUnit: TimeIntervalUnit) = Extract(timeIntervalUnit, expr)
/**
- * Returns the quarter of a year from a SQL date.
- *
- * e.g. "1994-09-27".toDate.quarter() leads to 3
- *
- * @deprecated This method will be used for describing an interval of months in future versions.
- * Use `extract(TimeIntervalUnit.QUARTER)` instead.
- */
- @deprecated
- def quarter() = Quarter(expr)
-
- /**
* Rounds down a time point to the given unit.
*
* e.g. "12:44:31".toDate.floor(MINUTE) leads to 12:44:00
@@ -605,98 +594,126 @@ trait ImplicitExpressionOperations {
*
* @return interval of months
*/
- def year = toMonthInterval(expr, 12)
+ def year: Expression = toMonthInterval(expr, 12)
/**
* Creates an interval of the given number of years.
*
* @return interval of months
*/
- def years = year
+ def years: Expression = year
+
+ /**
+ * Creates an interval of the given number of quarters.
+ *
+ * @return interval of months
+ */
+ def quarter: Expression = toMonthInterval(expr, 3)
+
+ /**
+ * Creates an interval of the given number of quarters.
+ *
+ * @return interval of months
+ */
+ def quarters: Expression = quarter
/**
* Creates an interval of the given number of months.
*
* @return interval of months
*/
- def month = toMonthInterval(expr, 1)
+ def month: Expression = toMonthInterval(expr, 1)
/**
* Creates an interval of the given number of months.
*
* @return interval of months
*/
- def months = month
+ def months: Expression = month
+
+ /**
+ * Creates an interval of the given number of weeks.
+ *
+ * @return interval of milliseconds
+ */
+ def week: Expression = toMilliInterval(expr, 7 * MILLIS_PER_DAY)
+
+ /**
+ * Creates an interval of the given number of weeks.
+ *
+ * @return interval of milliseconds
+ */
+ def weeks: Expression = week
/**
* Creates an interval of the given number of days.
*
* @return interval of milliseconds
*/
- def day = toMilliInterval(expr, MILLIS_PER_DAY)
+ def day: Expression = toMilliInterval(expr, MILLIS_PER_DAY)
/**
* Creates an interval of the given number of days.
*
* @return interval of milliseconds
*/
- def days = day
+ def days: Expression = day
/**
* Creates an interval of the given number of hours.
*
* @return interval of milliseconds
*/
- def hour = toMilliInterval(expr, MILLIS_PER_HOUR)
+ def hour: Expression = toMilliInterval(expr, MILLIS_PER_HOUR)
/**
* Creates an interval of the given number of hours.
*
* @return interval of milliseconds
*/
- def hours = hour
+ def hours: Expression = hour
/**
* Creates an interval of the given number of minutes.
*
* @return interval of milliseconds
*/
- def minute = toMilliInterval(expr, MILLIS_PER_MINUTE)
+ def minute: Expression = toMilliInterval(expr, MILLIS_PER_MINUTE)
/**
* Creates an interval of the given number of minutes.
*
* @return interval of milliseconds
*/
- def minutes = minute
+ def minutes: Expression = minute
/**
* Creates an interval of the given number of seconds.
*
* @return interval of milliseconds
*/
- def second = toMilliInterval(expr, MILLIS_PER_SECOND)
+ def second: Expression = toMilliInterval(expr, MILLIS_PER_SECOND)
/**
* Creates an interval of the given number of seconds.
*
* @return interval of milliseconds
*/
- def seconds = second
+ def seconds: Expression = second
/**
* Creates an interval of the given number of milliseconds.
*
* @return interval of milliseconds
*/
- def milli = toMilliInterval(expr, 1)
+ def milli: Expression = toMilliInterval(expr, 1)
/**
* Creates an interval of the given number of milliseconds.
*
* @return interval of milliseconds
*/
- def millis = milli
+ def millis: Expression = milli
// Row interval type
@@ -705,7 +722,7 @@ trait ImplicitExpressionOperations {
*
* @return interval of rows
*/
- def rows = toRowInterval(expr)
+ def rows: Expression = toRowInterval(expr)
// Advanced type helper functions
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
index faf6268..4b2440c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
@@ -60,8 +60,12 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
lazy val LOG: Keyword = Keyword("log")
lazy val YEARS: Keyword = Keyword("years")
lazy val YEAR: Keyword = Keyword("year")
+ lazy val QUARTERS: Keyword = Keyword("quarters")
+ lazy val QUARTER: Keyword = Keyword("quarter")
lazy val MONTHS: Keyword = Keyword("months")
lazy val MONTH: Keyword = Keyword("month")
+ lazy val WEEKS: Keyword = Keyword("weeks")
+ lazy val WEEK: Keyword = Keyword("week")
lazy val DAYS: Keyword = Keyword("days")
lazy val DAY: Keyword = Keyword("day")
lazy val HOURS: Keyword = Keyword("hours")
@@ -273,13 +277,17 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
composite <~ "." ~ TO_TIME ~ opt("()") ^^ { e => Cast(e, SqlTimeTypeInfo.TIME) }
lazy val suffixTimeInterval : PackratParser[Expression] =
- composite ~ "." ~ (YEARS | MONTHS | DAYS | HOURS | MINUTES | SECONDS | MILLIS |
- YEAR | MONTH | DAY | HOUR | MINUTE | SECOND | MILLI) ^^ {
+ composite ~ "." ~ (YEARS | QUARTERS | MONTHS | WEEKS | DAYS | HOURS | MINUTES |
+ SECONDS | MILLIS | YEAR | QUARTER | MONTH | WEEK | DAY | HOUR | MINUTE | SECOND | MILLI) ^^ {
case expr ~ _ ~ (YEARS.key | YEAR.key) => toMonthInterval(expr, 12)
+ case expr ~ _ ~ (QUARTERS.key | QUARTER.key) => toMonthInterval(expr, 3)
+
case expr ~ _ ~ (MONTHS.key | MONTH.key) => toMonthInterval(expr, 1)
+ case expr ~ _ ~ (WEEKS.key | WEEKS.key) => toMilliInterval(expr, 7 * MILLIS_PER_DAY)
+
case expr ~ _ ~ (DAYS.key | DAY.key) => toMilliInterval(expr, MILLIS_PER_DAY)
case expr ~ _ ~ (HOURS.key | HOUR.key) => toMilliInterval(expr, MILLIS_PER_HOUR)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/symbols.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/symbols.scala
index ec127e2..78ad3e2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/symbols.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/symbols.scala
@@ -84,8 +84,9 @@ object TimeIntervalUnit extends TableSymbols {
val YEAR = Value(TimeUnitRange.YEAR)
val YEAR_TO_MONTH = Value(TimeUnitRange.YEAR_TO_MONTH)
- val MONTH = Value(TimeUnitRange.MONTH)
val QUARTER = Value(TimeUnitRange.QUARTER)
+ val MONTH = Value(TimeUnitRange.MONTH)
+ val WEEK = Value(TimeUnitRange.WEEK)
val DAY = Value(TimeUnitRange.DAY)
val DAY_TO_HOUR = Value(TimeUnitRange.DAY_TO_HOUR)
val DAY_TO_MINUTE = Value(TimeUnitRange.DAY_TO_MINUTE)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
index 5dff774..ac996f6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
@@ -47,8 +47,9 @@ case class Extract(timeIntervalUnit: Expression, temporal: Expression) extends E
timeIntervalUnit match {
case SymbolExpression(TimeIntervalUnit.YEAR)
- | SymbolExpression(TimeIntervalUnit.MONTH)
| SymbolExpression(TimeIntervalUnit.QUARTER)
+ | SymbolExpression(TimeIntervalUnit.MONTH)
+ | SymbolExpression(TimeIntervalUnit.WEEK)
| SymbolExpression(TimeIntervalUnit.DAY)
if temporal.resultType == SqlTimeTypeInfo.DATE
|| temporal.resultType == SqlTimeTypeInfo.TIMESTAMP
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index 453de19..d1ff806 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -1443,12 +1443,24 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
"1996")
testAllApis(
+ 'f16.extract(TimeIntervalUnit.QUARTER),
+ "f16.extract(QUARTER)",
+ "EXTRACT(QUARTER FROM f16)",
+ "4")
+
+ testAllApis(
'f16.extract(TimeIntervalUnit.MONTH),
"extract(f16, MONTH)",
"EXTRACT(MONTH FROM f16)",
"11")
testAllApis(
+ 'f16.extract(TimeIntervalUnit.WEEK),
+ "extract(f16, WEEK)",
+ "EXTRACT(WEEK FROM f16)",
+ "45")
+
+ testAllApis(
'f16.extract(TimeIntervalUnit.DAY),
"f16.extract(DAY)",
"EXTRACT(DAY FROM f16)",
@@ -1461,12 +1473,30 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
"1996")
testAllApis(
+ 'f18.extract(TimeIntervalUnit.QUARTER),
+ "f18.extract(QUARTER)",
+ "EXTRACT(QUARTER FROM f18)",
+ "4")
+
+ testAllApis(
+ 'f16.extract(TimeIntervalUnit.QUARTER),
+ "f16.extract(QUARTER)",
+ "EXTRACT(QUARTER FROM f16)",
+ "4")
+
+ testAllApis(
'f18.extract(TimeIntervalUnit.MONTH),
"f18.extract(MONTH)",
"EXTRACT(MONTH FROM f18)",
"11")
testAllApis(
+ 'f18.extract(TimeIntervalUnit.WEEK),
+ "f18.extract(WEEK)",
+ "EXTRACT(WEEK FROM f18)",
+ "45")
+
+ testAllApis(
'f18.extract(TimeIntervalUnit.DAY),
"f18.extract(DAY)",
"EXTRACT(DAY FROM f18)",
@@ -1539,23 +1569,17 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
"1")
testAllApis(
+ 'f20.extract(TimeIntervalUnit.QUARTER),
+ "f20.extract(QUARTER)",
+ "EXTRACT(QUARTER FROM f20)",
+ "1")
+
+ testAllApis(
'f20.extract(TimeIntervalUnit.YEAR),
"f20.extract(YEAR)",
"EXTRACT(YEAR FROM f20)",
"2")
- testAllApis(
- 'f18.extract(TimeIntervalUnit.QUARTER),
- "f18.extract(QUARTER)",
- "EXTRACT(QUARTER FROM f18)",
- "4")
-
- testAllApis(
- 'f16.extract(TimeIntervalUnit.QUARTER),
- "f16.extract(QUARTER)",
- "EXTRACT(QUARTER FROM f16)",
- "4")
-
// test SQL only time units
testSqlApi(
"EXTRACT(MILLENNIUM FROM f18)",
@@ -1892,34 +1916,13 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
}
@Test
- def testQuarter(): Unit = {
- testAllApis(
- "1997-01-27".toDate.quarter(),
- "'1997-01-27'.toDate.quarter()",
- "QUARTER(DATE '1997-01-27')",
- "1")
-
- testAllApis(
- "1997-04-27".toDate.quarter(),
- "'1997-04-27'.toDate.quarter()",
- "QUARTER(DATE '1997-04-27')",
- "2")
-
- testAllApis(
- "1997-12-31".toDate.quarter(),
- "'1997-12-31'.toDate.quarter()",
- "QUARTER(DATE '1997-12-31')",
- "4")
- }
-
- @Test
def testTimestampAdd(): Unit = {
val data = Seq(
- (1, "TIMESTAMP '2017-11-29 22:58:58.998'"),
- (3, "TIMESTAMP '2017-11-29 22:58:58.998'"),
- (-1, "TIMESTAMP '2017-11-29 22:58:58.998'"),
- (-61, "TIMESTAMP '2017-11-29 22:58:58.998'"),
- (-1000, "TIMESTAMP '2017-11-29 22:58:58.998'")
+ (1, "2017-11-29 22:58:58.998"),
+ (3, "2017-11-29 22:58:58.998"),
+ (-1, "2017-11-29 22:58:58.998"),
+ (-61, "2017-11-29 22:58:58.998"),
+ (-1000, "2017-11-29 22:58:58.998")
)
val YEAR = Seq(
@@ -1998,27 +2001,60 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
"SQL_TSI_SECOND" -> SECOND
)
- for ((interval, result) <- intervalMapResults) {
- testSqlApi(
- s"TIMESTAMPADD($interval, ${data.head._1}, ${data.head._2})", result.head)
- testSqlApi(
- s"TIMESTAMPADD($interval, ${data(1)._1}, ${data(1)._2})", result(1))
- testSqlApi(
- s"TIMESTAMPADD($interval, ${data(2)._1}, ${data(2)._2})", result(2))
- testSqlApi(
- s"TIMESTAMPADD($interval, ${data(3)._1}, ${data(3)._2})", result(3))
- testSqlApi(
- s"TIMESTAMPADD($interval, ${data(4)._1}, ${data(4)._2})", result(4))
+ def intervalCount(interval: String, count: Int): (Expression, String) = interval match {
+ case "YEAR" => (count.years, s"$count.years")
+ case "SQL_TSI_YEAR" => (count.years, s"$count.years")
+ case "QUARTER" => (count.quarters, s"$count.quarters")
+ case "SQL_TSI_QUARTER" => (count.quarters, s"$count.quarters")
+ case "MONTH" => (count.months, s"$count.months")
+ case "SQL_TSI_MONTH" => (count.months, s"$count.months")
+ case "WEEK" => (count.weeks, s"$count.weeks")
+ case "SQL_TSI_WEEK" => (count.weeks, s"$count.weeks")
+ case "DAY" => (count.days, s"$count.days")
+ case "SQL_TSI_DAY" => (count.days, s"$count.days")
+ case "HOUR" => (count.hours, s"$count.hours")
+ case "SQL_TSI_HOUR" => (count.hours, s"$count.hours")
+ case "MINUTE" => (count.minutes, s"$count.minutes")
+ case "SQL_TSI_MINUTE" => (count.minutes, s"$count.minutes")
+ case "SECOND" => (count.seconds, s"$count.seconds")
+ case "SQL_TSI_SECOND" => (count.seconds, s"$count.seconds")
}
- testSqlApi("TIMESTAMPADD(HOUR, CAST(NULL AS INTEGER), TIMESTAMP '2016-02-24 12:42:25')", "null")
+ for ((interval, result) <- intervalMapResults) {
+ for (i <- 0 to 4) {
+ val (offset, ts) = data(i)
+ val timeInterval = intervalCount(interval, offset)
+ testAllApis(
+ timeInterval._1 + ts.toTimestamp,
+ s"${timeInterval._2} + '$ts'.toTimestamp",
+ s"TIMESTAMPADD($interval, $offset, TIMESTAMP '$ts')",
+ result(i))
+ }
+ }
- testSqlApi("TIMESTAMPADD(HOUR, -200, CAST(NULL AS TIMESTAMP))", "null")
+ testAllApis(
+ "2016-02-24 12:42:25".toTimestamp + Null(Types.INTERVAL_MILLIS),
+ "'2016-02-24 12:42:25'.toTimestamp + Null(INTERVAL_MILLIS)",
+ "TIMESTAMPADD(HOUR, CAST(NULL AS INTEGER), TIMESTAMP '2016-02-24 12:42:25')",
+ "null")
- testSqlApi("TIMESTAMPADD(DAY, 1, DATE '2016-06-15')", "2016-06-16")
+ testAllApis(
+ Null(Types.SQL_TIMESTAMP) + -200.hours,
+ "Null(SQL_TIMESTAMP) + -200.hours",
+ "TIMESTAMPADD(HOUR, -200, CAST(NULL AS TIMESTAMP))",
+ "null")
- testSqlApi("TIMESTAMPADD(MONTH, 3, CAST(NULL AS TIMESTAMP))", "null")
+ testAllApis(
+ "2016-06-15".toDate + 1.day,
+ "'2016-06-15'.toDate + 1.day",
+ "TIMESTAMPADD(DAY, 1, DATE '2016-06-15')",
+ "2016-06-16")
+ testAllApis(
+ Null(Types.SQL_TIMESTAMP) + 3.months,
+ "Null(SQL_TIMESTAMP) + 3.months",
+ "TIMESTAMPADD(MONTH, 3, CAST(NULL AS TIMESTAMP))",
+ "null")
}
// ----------------------------------------------------------------------------------------------