You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/09/25 17:13:35 UTC
[flink] branch master updated: [FLINK-6847] [FLINK-6813] [table]
Add support for TIMESTAMPDIFF in Table API & SQL
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c5ce970 [FLINK-6847] [FLINK-6813] [table] Add support for TIMESTAMPDIFF in Table API & SQL
c5ce970 is described below
commit c5ce970e781df60eb27b62446853eaa0579c8706
Author: xueyu <27...@qq.com>
AuthorDate: Sun Jul 8 18:50:06 2018 +0800
[FLINK-6847] [FLINK-6813] [table] Add support for TIMESTAMPDIFF in Table API & SQL
This closes #6282.
---
docs/dev/table/functions.md | 84 ++++++++++++++-
.../flink/table/api/scala/expressionDsl.scala | 29 +++++
.../apache/flink/table/codegen/CodeGenerator.scala | 4 +-
.../table/codegen/calls/FunctionGenerator.scala | 24 +++++
.../table/codegen/calls/ScalarOperators.scala | 34 ++++++
.../table/codegen/calls/TimestampDiffCallGen.scala | 118 +++++++++++++++++++++
.../flink/table/expressions/ExpressionParser.scala | 7 ++
.../flink/table/expressions/arithmetic.scala | 21 ++--
.../org/apache/flink/table/expressions/time.scala | 54 ++++++++++
.../flink/table/validate/FunctionCatalog.scala | 2 +
.../table/expressions/ScalarFunctionsTest.scala | 115 ++++++++++++++++++++
.../validation/ScalarFunctionsValidationTest.scala | 15 ++-
12 files changed, 494 insertions(+), 13 deletions(-)
diff --git a/docs/dev/table/functions.md b/docs/dev/table/functions.md
index 85768ab..1108294 100644
--- a/docs/dev/table/functions.md
+++ b/docs/dev/table/functions.md
@@ -3311,14 +3311,27 @@ DATE_FORMAT(timestamp, string)
<tr>
<td>
{% highlight text %}
-TIMESTAMPADD(unit, interval, timevalue)
+TIMESTAMPADD(timeintervalunit, interval, timepoint)
{% endhighlight %}
</td>
<td>
- <p>Returns a new time value that adds a (signed) integer interval to <i>timevalue</i>. The unit for <i>interval</i> is given by the unit argument, which should be one of the following values: <code>SECOND</code>, <code>MINUTE</code>, <code>HOUR</code>, <code>DAY</code>, <code>WEEK</code>, <code>MONTH</code>, <code>QUARTER</code>, or <code>YEAR</code>.</p>
+ <p>Returns a new time value that adds a (signed) integer interval to <i>timepoint</i>. The unit for <i>interval</i> is given by the unit argument, which should be one of the following values: <code>SECOND</code>, <code>MINUTE</code>, <code>HOUR</code>, <code>DAY</code>, <code>WEEK</code>, <code>MONTH</code>, <code>QUARTER</code>, or <code>YEAR</code>.</p>
<p>E.g., <code>TIMESTAMPADD(WEEK, 1, DATE '2003-01-02')</code> returns <code>2003-01-09</code>.</p>
</td>
</tr>
+
+ <tr>
+ <td>
+ {% highlight text %}
+TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2)
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Returns the (signed) number of <i>timepointunit</i> between <i>timepoint1</i> and <i>timepoint2</i>. The unit for the interval is given by the first argument, which should be one of the following values: <code>SECOND</code>, <code>MINUTE</code>, <code>HOUR</code>, <code>DAY</code>, <code>MONTH</code>, or <code>YEAR</code>. See also the <a href="#time-interval-and-point-unit-specifiers">Time Interval and Point Unit Specifiers table</a>.</p>
+ <p>E.g., <code>TIMESTAMPDIFF(DAY, TIMESTAMP '2003-01-02 10:00:00', TIMESTAMP '2003-01-03 10:00:00')</code> leads to <code>1</code>.</p>
+ </td>
+ </tr>
+
</tbody>
</table>
</div>
@@ -3564,6 +3577,19 @@ dateFormat(TIMESTAMP, STRING)
<p>E.g., <code>dateFormat(ts, '%Y, %d %M')</code> results in strings formatted as "2017, 05 May".</p>
</td>
</tr>
+
+ <tr>
+ <td>
+ {% highlight java %}
+timestampDiff(TIMEPOINTUNIT, TIMEPOINT1, TIMEPOINT2)
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Returns the (signed) number of <i>TIMEPOINTUNIT</i> between <i>TIMEPOINT1</i> and <i>TIMEPOINT2</i>. The unit for the interval is given by the first argument, which should be one of the following values: <code>SECOND</code>, <code>MINUTE</code>, <code>HOUR</code>, <code>DAY</code>, <code>MONTH</code>, or <code>YEAR</code>. See also the <a href="#time-interval-and-point-unit-specifiers">Time Interval and Point Unit Specifiers table</a>.</p>
+ <p>E.g., <code>timestampDiff(DAY, '2003-01-02 10:00:00'.toTimestamp, '2003-01-03 10:00:00'.toTimestamp)</code> leads to <code>1</code>.</p>
+ </td>
+ </tr>
+
</tbody>
</table>
</div>
@@ -3809,6 +3835,19 @@ dateFormat(TIMESTAMP, STRING)
<p>E.g., <code>dateFormat('ts, "%Y, %d %M")</code> results in strings formatted as "2017, 05 May".</p>
</td>
</tr>
+
+ <tr>
+ <td>
+ {% highlight scala %}
+timestampDiff(TIMEPOINTUNIT, TIMEPOINT1, TIMEPOINT2)
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Returns the (signed) number of <i>TIMEPOINTUNIT</i> between <i>TIMEPOINT1</i> and <i>TIMEPOINT2</i>. The unit for the interval is given by the first argument, which should be one of the following values: <code>SECOND</code>, <code>MINUTE</code>, <code>HOUR</code>, <code>DAY</code>, <code>MONTH</code>, or <code>YEAR</code>. See also the <a href="#time-interval-and-point-unit-specifiers">Time Interval and Point Unit Specifiers table</a>.</p>
+ <p>E.g., <code>timestampDiff(TimePointUnit.DAY, '2003-01-02 10:00:00'.toTimestamp, '2003-01-03 10:00:00'.toTimestamp)</code> leads to <code>1</code>.</p>
+ </td>
+ </tr>
+
</tbody>
</table>
</div>
@@ -5463,3 +5502,44 @@ The following table lists specifiers for date format functions.
</table>
{% top %}
+
+Time Interval and Point Unit Specifiers
+---------------------------------------
+
+The following table lists specifiers for time interval and time point units.
+
+For Table API, please use `_` for spaces (e.g., `DAY_TO_HOUR`).
+
+| Time Interval Unit | Time Point Unit |
+| :----------------------- | :----------------------------- |
+| `MILLENIUM` _(SQL-only)_ | |
+| `CENTURY` _(SQL-only)_ | |
+| `YEAR` | `YEAR` |
+| `YEAR TO MONTH` | |
+| `QUARTER` | `QUARTER` |
+| `MONTH` | `MONTH` |
+| `WEEK` | `WEEK` |
+| `DAY` | `DAY` |
+| `DAY TO HOUR` | |
+| `DAY TO MINUTE` | |
+| `DAY TO SECOND` | |
+| `HOUR` | `HOUR` |
+| `HOUR TO MINUTE` | |
+| `HOUR TO SECOND` | |
+| `MINUTE` | `MINUTE` |
+| `MINUTE TO SECOND` | |
+| `SECOND` | `SECOND` |
+| | `MILLISECOND` |
+| | `MICROSECOND` |
+| `DOY` _(SQL-only)_ | |
+| `DOW` _(SQL-only)_ | |
+| | `SQL_TSI_YEAR` _(SQL-only)_ |
+| | `SQL_TSI_QUARTER` _(SQL-only)_ |
+| | `SQL_TSI_MONTH` _(SQL-only)_ |
+| | `SQL_TSI_WEEK` _(SQL-only)_ |
+| | `SQL_TSI_DAY` _(SQL-only)_ |
+| | `SQL_TSI_HOUR` _(SQL-only)_ |
+| | `SQL_TSI_MINUTE` _(SQL-only)_ |
+| | `SQL_TSI_SECOND ` _(SQL-only)_ |
+
+{% top %}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index 626012c..14adb71 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -26,6 +26,7 @@ import org.apache.flink.table.api.{CurrentRange, CurrentRow, TableException, Unb
import org.apache.flink.table.expressions.ExpressionUtils.{convertArray, toMilliInterval, toMonthInterval, toRowInterval}
import org.apache.flink.table.api.Table
import org.apache.flink.table.expressions.TimeIntervalUnit.TimeIntervalUnit
+import org.apache.flink.table.expressions.TimePointUnit.TimePointUnit
import org.apache.flink.table.expressions._
import org.apache.flink.table.functions.{AggregateFunction, DistinctAggregateFunction}
@@ -1142,6 +1143,34 @@ object dateFormat {
}
/**
+ * Returns the (signed) number of [[TimePointUnit]] between timePoint1 and timePoint2.
+ *
+ * For example, timestampDiff(TimePointUnit.DAY, '2016-06-15'.toDate, '2016-06-18'.toDate leads
+ * to 3.
+ */
+object timestampDiff {
+
+ /**
+ * Returns the (signed) number of [[TimePointUnit]] between timePoint1 and timePoint2.
+ *
+ * For example, timestampDiff(TimePointUnit.DAY, '2016-06-15'.toDate, '2016-06-18'.toDate leads
+ * to 3.
+ *
+ * @param timePointUnit The unit to compute diff.
+ * @param timePoint1 The first point in time.
+ * @param timePoint2 The second point in time.
+ * @return The number of intervals as integer value.
+ */
+ def apply(
+ timePointUnit: TimePointUnit,
+ timePoint1: Expression,
+ timePoint2: Expression)
+ : Expression = {
+ TimestampDiff(timePointUnit, timePoint1, timePoint2)
+ }
+}
+
+/**
* Creates an array of literals. The array will be an array of objects (not primitives).
*/
object array {
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index 0fdd885..19e030b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -769,7 +769,7 @@ abstract class CodeGenerator(
val right = operands(1)
requireTemporal(left)
requireTemporal(right)
- generateTemporalPlusMinus(plus = true, nullCheck, left, right, config)
+ generateTemporalPlusMinus(plus = true, nullCheck, resultType, left, right, config)
case MINUS if isNumeric(resultType) =>
val left = operands.head
@@ -783,7 +783,7 @@ abstract class CodeGenerator(
val right = operands(1)
requireTemporal(left)
requireTemporal(right)
- generateTemporalPlusMinus(plus = false, nullCheck, left, right, config)
+ generateTemporalPlusMinus(plus = false, nullCheck, resultType, left, right, config)
case MULTIPLY if isNumeric(resultType) =>
val left = operands.head
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index b6eb3e8..e320052 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.codegen.calls
import java.lang.reflect.Method
+import org.apache.calcite.avatica.util.TimeUnit
import org.apache.calcite.avatica.util.TimeUnitRange
import org.apache.calcite.sql.SqlOperator
import org.apache.calcite.sql.fun.SqlStdOperatorTable._
@@ -532,6 +533,29 @@ object FunctionGenerator {
new ExtractCallGen(LONG_TYPE_INFO, BuiltInMethod.UNIX_DATE_EXTRACT.method))
addSqlFunction(
+ TIMESTAMP_DIFF,
+ Seq(
+ new GenericTypeInfo(classOf[TimeUnit]),
+ SqlTimeTypeInfo.TIMESTAMP,
+ SqlTimeTypeInfo.TIMESTAMP),
+ new TimestampDiffCallGen)
+
+ addSqlFunction(
+ TIMESTAMP_DIFF,
+ Seq(new GenericTypeInfo(classOf[TimeUnit]), SqlTimeTypeInfo.TIMESTAMP, SqlTimeTypeInfo.DATE),
+ new TimestampDiffCallGen)
+
+ addSqlFunction(
+ TIMESTAMP_DIFF,
+ Seq(new GenericTypeInfo(classOf[TimeUnit]), SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.TIMESTAMP),
+ new TimestampDiffCallGen)
+
+ addSqlFunction(
+ TIMESTAMP_DIFF,
+ Seq(new GenericTypeInfo(classOf[TimeUnit]), SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.DATE),
+ new TimestampDiffCallGen)
+
+ addSqlFunction(
FLOOR,
Seq(SqlTimeTypeInfo.DATE, new GenericTypeInfo(classOf[TimeUnitRange])),
new FloorCeilCallGen(
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
index 57f1618..282b167 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
@@ -825,6 +825,7 @@ object ScalarOperators {
def generateTemporalPlusMinus(
plus: Boolean,
nullCheck: Boolean,
+ resultType: TypeInformation[_],
left: GeneratedExpression,
right: GeneratedExpression,
config: TableConfig)
@@ -833,6 +834,7 @@ object ScalarOperators {
val op = if (plus) "+" else "-"
(left.resultType, right.resultType) match {
+ // arithmetic of time point and time interval
case (l: TimeIntervalTypeInfo[_], r: TimeIntervalTypeInfo[_]) if l == r =>
generateArithmeticOperator(op, nullCheck, l, left, right, config)
@@ -861,6 +863,38 @@ object ScalarOperators {
(l, r) => s"${qualifyMethod(BuiltInMethod.ADD_MONTHS.method)}($l, $op($r))"
}
+ // minus arithmetic of time points (i.e. for TIMESTAMPDIFF)
+ case (l: SqlTimeTypeInfo[_], r: SqlTimeTypeInfo[_]) if !plus =>
+ resultType match {
+ case TimeIntervalTypeInfo.INTERVAL_MONTHS =>
+ generateOperatorIfNotNull(nullCheck, resultType, left, right) {
+ (ll, rr) => (l, r) match {
+ case (SqlTimeTypeInfo.TIMESTAMP, SqlTimeTypeInfo.DATE) =>
+ s"${qualifyMethod(BuiltInMethod.SUBTRACT_MONTHS.method)}" +
+ s"($ll, $rr * ${MILLIS_PER_DAY}L)"
+ case (SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.TIMESTAMP) =>
+ s"${qualifyMethod(BuiltInMethod.SUBTRACT_MONTHS.method)}" +
+ s"($ll * ${MILLIS_PER_DAY}L, $rr)"
+ case _ =>
+ s"${qualifyMethod(BuiltInMethod.SUBTRACT_MONTHS.method)}($ll, $rr)"
+ }
+ }
+
+ case TimeIntervalTypeInfo.INTERVAL_MILLIS =>
+ generateOperatorIfNotNull(nullCheck, resultType, left, right) {
+ (ll, rr) => (l, r) match {
+ case (SqlTimeTypeInfo.TIMESTAMP, SqlTimeTypeInfo.TIMESTAMP) =>
+ s"$ll $op $rr"
+ case (SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.DATE) =>
+ s"($ll * ${MILLIS_PER_DAY}L) $op ($rr * ${MILLIS_PER_DAY}L)"
+ case (SqlTimeTypeInfo.TIMESTAMP, SqlTimeTypeInfo.DATE) =>
+ s"$ll $op ($rr * ${MILLIS_PER_DAY}L)"
+ case (SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.TIMESTAMP) =>
+ s"($ll * ${MILLIS_PER_DAY}L) $op $rr"
+ }
+ }
+ }
+
case _ =>
throw new CodeGenException("Unsupported temporal arithmetic.")
}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TimestampDiffCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TimestampDiffCallGen.scala
new file mode 100644
index 0000000..5a7cd4f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TimestampDiffCallGen.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen.calls
+
+import org.apache.calcite.avatica.util.DateTimeUtils.MILLIS_PER_DAY
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.util.BuiltInMethod
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
+import org.apache.flink.table.codegen.CodeGenUtils._
+import org.apache.flink.table.codegen.calls.CallGenerator.generateCallIfArgsNotNull
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, GeneratedExpression}
+
+class TimestampDiffCallGen extends CallGenerator {
+
+ override def generate(
+ codeGenerator: CodeGenerator,
+ operands: Seq[GeneratedExpression])
+ : GeneratedExpression = {
+
+ val unit = getEnum(operands.head).asInstanceOf[TimeUnit]
+ unit match {
+ case TimeUnit.YEAR |
+ TimeUnit.MONTH |
+ TimeUnit.QUARTER =>
+ (operands(1).resultType, operands(2).resultType) match {
+ case (SqlTimeTypeInfo.TIMESTAMP, SqlTimeTypeInfo.DATE) =>
+ generateCallIfArgsNotNull(codeGenerator.nullCheck, INT_TYPE_INFO, operands) {
+ (terms) =>
+ s"""
+ |${qualifyMethod(BuiltInMethod.SUBTRACT_MONTHS.method)}(${terms(1)},
+ | ${terms(2)} * ${MILLIS_PER_DAY}L) / ${unit.multiplier.intValue()}
+ |""".stripMargin
+ }
+
+ case (SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.TIMESTAMP) =>
+ generateCallIfArgsNotNull(codeGenerator.nullCheck, INT_TYPE_INFO, operands) {
+ (terms) =>
+ s"""
+ |${qualifyMethod(BuiltInMethod.SUBTRACT_MONTHS.method)}(
+ |${terms(1)} * ${MILLIS_PER_DAY}L, ${terms(2)}) / ${unit.multiplier.intValue()}
+ |""".stripMargin
+ }
+
+ case _ =>
+ generateCallIfArgsNotNull(codeGenerator.nullCheck, INT_TYPE_INFO, operands) {
+ (terms) =>
+ s"""
+ |${qualifyMethod(BuiltInMethod.SUBTRACT_MONTHS.method)}(${terms(1)},
+ | ${terms(2)}) / ${unit.multiplier.intValue()}
+ |""".stripMargin
+ }
+ }
+
+ case TimeUnit.WEEK |
+ TimeUnit.DAY |
+ TimeUnit.HOUR |
+ TimeUnit.MINUTE |
+ TimeUnit.SECOND =>
+ (operands(1).resultType, operands(2).resultType) match {
+ case (SqlTimeTypeInfo.TIMESTAMP, SqlTimeTypeInfo.TIMESTAMP) =>
+ generateCallIfArgsNotNull(codeGenerator.nullCheck, INT_TYPE_INFO, operands) {
+ (terms) =>
+ s"""
+ |(int)((${terms(1)} - ${terms(2)}) / ${unit.multiplier.intValue()})
+ |""".stripMargin
+ }
+
+ case (SqlTimeTypeInfo.TIMESTAMP, SqlTimeTypeInfo.DATE) =>
+ generateCallIfArgsNotNull(codeGenerator.nullCheck, INT_TYPE_INFO, operands) {
+ (terms) =>
+ s"""
+ |(int)((${terms(1)} -
+ | ${terms(2)} * ${MILLIS_PER_DAY}L) / ${unit.multiplier.intValue()})
+ |""".stripMargin
+ }
+
+ case (SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.TIMESTAMP) =>
+ generateCallIfArgsNotNull(codeGenerator.nullCheck, INT_TYPE_INFO, operands) {
+ (terms) =>
+ s"""
+ |(int)((${terms(1)} * ${MILLIS_PER_DAY}L -
+ | ${terms(2)}) / ${unit.multiplier.intValue()})
+ |""".stripMargin
+ }
+
+ case (SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.DATE) =>
+ generateCallIfArgsNotNull(codeGenerator.nullCheck, INT_TYPE_INFO, operands) {
+ (terms) =>
+ s"""
+ |(int)((${terms(1)} * ${MILLIS_PER_DAY}L -
+ | ${terms(2)} * ${MILLIS_PER_DAY}L) / ${unit.multiplier.intValue()})
+ |""".stripMargin
+ }
+ }
+
+ case _ =>
+ throw new CodeGenException(
+ "Unit '" + unit + "' can not be applied to the timestamp difference function.")
+ }
+ }
+}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
index c82d556..0b84f94 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
@@ -59,6 +59,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
lazy val TO_TIMESTAMP: Keyword = Keyword("toTimestamp")
lazy val TRIM: Keyword = Keyword("trim")
lazy val EXTRACT: Keyword = Keyword("extract")
+ lazy val TIMESTAMP_DIFF: Keyword = Keyword("timestampDiff")
lazy val FLOOR: Keyword = Keyword("floor")
lazy val CEIL: Keyword = Keyword("ceil")
lazy val LOG: Keyword = Keyword("log")
@@ -392,6 +393,11 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
case _ ~ _ ~ operand ~ _ ~ unit ~ _ => Extract(unit, operand)
}
+ lazy val prefixTimestampDiff: PackratParser[Expression] =
+ TIMESTAMP_DIFF ~ "(" ~ timePointUnit ~ "," ~ expression ~ "," ~ expression ~ ")" ^^ {
+ case _ ~ _ ~ unit ~ _ ~ operand1 ~ _ ~ operand2 ~ _ => TimestampDiff(unit, operand1, operand2)
+ }
+
lazy val prefixFloor: PackratParser[Expression] =
FLOOR ~ "(" ~ expression ~ "," ~ timeIntervalUnit ~ ")" ^^ {
case _ ~ _ ~ operand ~ _ ~ unit ~ _ => TemporalFloor(unit, operand)
@@ -437,6 +443,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
prefixAs| prefixToTimestamp | prefixToTime | prefixToDate |
// expressions that take enumerations
prefixCast | prefixTrim | prefixTrimWithoutArgs | prefixExtract | prefixFloor | prefixCeil |
+ prefixTimestampDiff |
// expressions that take literals
prefixGet |
// expression with special identifier
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/arithmetic.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/arithmetic.scala
index 28049e8..3dc8899 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/arithmetic.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/arithmetic.scala
@@ -42,11 +42,10 @@ abstract class BinaryArithmetic extends BinaryExpression {
throw new RuntimeException("This should never happen.")
}
- // TODO: tighten this rule once we implemented type coercion rules during validation
override private[flink] def validateInput(): ValidationResult = {
if (!isNumeric(left.resultType) || !isNumeric(right.resultType)) {
- ValidationFailure(s"$this requires both operands Numeric, get " +
- s"$left : ${left.resultType} and $right : ${right.resultType}")
+ ValidationFailure(s"The arithmetic '$this' requires both operands to be numeric, but was " +
+ s"'$left' : '${left.resultType}' and '$right' : '${right.resultType}'.")
} else {
ValidationSuccess
}
@@ -93,9 +92,9 @@ case class Plus(left: Expression, right: Expression) extends BinaryArithmetic {
ValidationSuccess
} else {
ValidationFailure(
- s"$this requires Numeric, String, Intervals of same type, " +
- s"or Interval and a time point input, " +
- s"get $left : ${left.resultType} and $right : ${right.resultType}")
+ s"The arithmetic '$this' requires input that is numeric, string, time intervals of the " +
+ s"same type, or a time interval and a time point type, " +
+ s"but was '$left' : '${left.resultType}' and '$right' : '${right.resultType}'.")
}
}
}
@@ -115,7 +114,8 @@ case class UnaryMinus(child: Expression) extends UnaryExpression {
} else if (isTimeInterval(child.resultType)) {
ValidationSuccess
} else {
- ValidationFailure(s"$this requires Numeric, or Interval input, get ${child.resultType}")
+ ValidationFailure(s"The arithmetic '$this' requires input that is numeric or a time " +
+ s"interval type, but was '${child.resultType}'.")
}
}
}
@@ -132,8 +132,13 @@ case class Minus(left: Expression, right: Expression) extends BinaryArithmetic {
ValidationSuccess
} else if (isTimeInterval(left.resultType) && isTimePoint(right.resultType)) {
ValidationSuccess
+ } else if (isNumeric(left.resultType) && isNumeric(right.resultType)) {
+ ValidationSuccess
} else {
- super.validateInput()
+ ValidationFailure(
+ s"The arithmetic '$this' requires inputs that are numeric, time intervals of the same " +
+ s"type, or a time interval and a time point type, " +
+ s"but was '$left' : '${left.resultType}' and '$right' : '${right.resultType}'.")
}
}
}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
index ac996f6..b17537e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
@@ -340,3 +340,57 @@ case class DateFormat(timestamp: Expression, format: Expression) extends Express
override private[flink] def resultType = STRING_TYPE_INFO
}
+
+case class TimestampDiff(
+ timePointUnit: Expression,
+ timePoint1: Expression,
+ timePoint2: Expression)
+ extends Expression {
+
+ override private[flink] def children: Seq[Expression] =
+ timePointUnit :: timePoint1 :: timePoint2 :: Nil
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (!TypeCheckUtils.isTimePoint(timePoint1.resultType)) {
+ return ValidationFailure(
+ s"$this requires an input time point type, " +
+ s"but timePoint1 is of type '${timePoint1.resultType}'.")
+ }
+
+ if (!TypeCheckUtils.isTimePoint(timePoint2.resultType)) {
+ return ValidationFailure(
+ s"$this requires an input time point type, " +
+ s"but timePoint2 is of type '${timePoint2.resultType}'.")
+ }
+
+ timePointUnit match {
+ case SymbolExpression(TimePointUnit.YEAR)
+ | SymbolExpression(TimePointUnit.QUARTER)
+ | SymbolExpression(TimePointUnit.MONTH)
+ | SymbolExpression(TimePointUnit.WEEK)
+ | SymbolExpression(TimePointUnit.DAY)
+ | SymbolExpression(TimePointUnit.HOUR)
+ | SymbolExpression(TimePointUnit.MINUTE)
+ | SymbolExpression(TimePointUnit.SECOND)
+ if timePoint1.resultType == SqlTimeTypeInfo.DATE
+ || timePoint1.resultType == SqlTimeTypeInfo.TIMESTAMP
+ || timePoint2.resultType == SqlTimeTypeInfo.DATE
+ || timePoint2.resultType == SqlTimeTypeInfo.TIMESTAMP =>
+ ValidationSuccess
+
+ case _ =>
+ ValidationFailure(s"$this operator does not support unit '$timePointUnit'" +
+ s" for input of type ('${timePoint1.resultType}', '${timePoint2.resultType}').")
+ }
+ }
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder
+ .getRexBuilder
+ .makeCall(SqlStdOperatorTable.TIMESTAMP_DIFF,
+ Seq(timePointUnit.toRexNode, timePoint2.toRexNode, timePoint1.toRexNode))
+ }
+
+ override def toString: String = s"timestampDiff(${children.mkString(", ")})"
+
+ override private[flink] def resultType = INT_TYPE_INFO
+}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index c60f979..801d2e9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -257,6 +257,7 @@ object FunctionCatalog {
"temporalOverlaps" -> classOf[TemporalOverlaps],
"dateTimePlus" -> classOf[Plus],
"dateFormat" -> classOf[DateFormat],
+ "timestampDiff" -> classOf[TimestampDiff],
// item
"at" -> classOf[ItemAt],
@@ -446,6 +447,7 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
ScalarSqlFunctions.BIN,
ScalarSqlFunctions.HEX,
SqlStdOperatorTable.TIMESTAMP_ADD,
+ SqlStdOperatorTable.TIMESTAMP_DIFF,
ScalarSqlFunctions.LOG,
ScalarSqlFunctions.LPAD,
ScalarSqlFunctions.RPAD,
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index e8708cb..7614f19 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -2248,6 +2248,121 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
}
@Test
+ def testTimestampDiff(): Unit = {
+ val dataMap = Map(
+ ("DAY", TimePointUnit.DAY, "SQL_TSI_DAY") -> Seq(
+ ("2018-07-03 11:11:11", "2018-07-05 11:11:11", "2"), // timestamp, timestamp
+ ("2016-06-15", "2016-06-16 11:11:11", "1"), // date, timestamp
+ ("2016-06-15 11:00:00", "2016-06-19", "3"), // timestamp, date
+ ("2016-06-15", "2016-06-18", "3") // date, date
+ ),
+ ("HOUR", TimePointUnit.HOUR, "SQL_TSI_HOUR") -> Seq(
+ ("2018-07-03 11:11:11", "2018-07-04 12:12:11", "25"),
+ ("2016-06-15", "2016-06-16 11:11:11", "35"),
+ ("2016-06-15 11:00:00", "2016-06-19", "85"),
+ ("2016-06-15", "2016-06-12", "-72")
+ ),
+ ("MINUTE", TimePointUnit.MINUTE, "SQL_TSI_MINUTE") -> Seq(
+ ("2018-07-03 11:11:11", "2018-07-03 12:10:11", "59"),
+ ("2016-06-15", "2016-06-16 11:11:11", "2111"),
+ ("2016-06-15 11:00:00", "2016-06-19", "5100"),
+ ("2016-06-15", "2016-06-18", "4320")
+ ),
+ ("SECOND", TimePointUnit.SECOND, "SQL_TSI_SECOND") -> Seq(
+ ("2018-07-03 11:11:11", "2018-07-03 11:12:12", "61"),
+ ("2016-06-15", "2016-06-16 11:11:11", "126671"),
+ ("2016-06-15 11:00:00", "2016-06-19", "306000"),
+ ("2016-06-15", "2016-06-18", "259200")
+ ),
+ ("WEEK", TimePointUnit.WEEK, "SQL_TSI_WEEK") -> Seq(
+ ("2018-05-03 11:11:11", "2018-07-03 11:12:12", "8"),
+ ("2016-04-15", "2016-07-16 11:11:11", "13"),
+ ("2016-04-15 11:00:00", "2016-09-19", "22"),
+ ("2016-08-15", "2016-06-18", "-8")
+ ),
+ ("MONTH", TimePointUnit.MONTH, "SQL_TSI_MONTH") -> Seq(
+ ("2018-07-03 11:11:11", "2018-09-05 11:11:11", "2"),
+ ("2016-06-15", "2018-06-16 11:11:11", "24"),
+ ("2016-06-15 11:00:00", "2018-05-19", "23"),
+ ("2016-06-15", "2018-03-18", "21")
+ ),
+ ("QUARTER", TimePointUnit.QUARTER, "SQL_TSI_QUARTER") -> Seq(
+ ("2018-01-03 11:11:11", "2018-09-05 11:11:11", "2"),
+ ("2016-06-15", "2018-06-16 11:11:11", "8"),
+ ("2016-06-15 11:00:00", "2018-05-19", "7"),
+ ("2016-06-15", "2018-03-18", "7")
+ )
+ )
+
+ for ((unitParts, dataParts) <- dataMap) {
+ for ((data,index) <- dataParts.zipWithIndex) {
+ index match {
+ case 0 => // timestamp, timestamp
+ testAllApis(
+ timestampDiff(unitParts._2, data._1.toTimestamp, data._2.toTimestamp),
+ s"timestampDiff(${unitParts._1}, '${data._1}'.toTimestamp, '${data._2}'.toTimestamp)",
+ s"TIMESTAMPDIFF(${unitParts._1}, TIMESTAMP '${data._1}', TIMESTAMP '${data._2}')",
+ data._3
+ )
+ testSqlApi( // sql tsi
+ s"TIMESTAMPDIFF(${unitParts._3}, TIMESTAMP '${data._1}', TIMESTAMP '${data._2}')",
+ data._3
+ )
+ case 1 => // date, timestamp
+ testAllApis(
+ timestampDiff(unitParts._2, data._1.toDate, data._2.toTimestamp),
+ s"timestampDiff(${unitParts._1}, '${data._1}'.toDate, '${data._2}'.toTimestamp)",
+ s"TIMESTAMPDIFF(${unitParts._1}, DATE '${data._1}', TIMESTAMP '${data._2}')",
+ data._3
+ )
+ testSqlApi( // sql tsi
+ s"TIMESTAMPDIFF(${unitParts._3}, DATE '${data._1}', TIMESTAMP '${data._2}')",
+ data._3
+ )
+ case 2 => // timestamp, date
+ testAllApis(
+ timestampDiff(unitParts._2, data._1.toTimestamp, data._2.toDate),
+ s"timestampDiff(${unitParts._1}, '${data._1}'.toTimestamp, '${data._2}'.toDate)",
+ s"TIMESTAMPDIFF(${unitParts._1}, TIMESTAMP '${data._1}', DATE '${data._2}')",
+ data._3
+ )
+ testSqlApi( // sql tsi
+ s"TIMESTAMPDIFF(${unitParts._3}, TIMESTAMP '${data._1}', DATE '${data._2}')",
+ data._3
+ )
+ case 3 => // date, date
+ testAllApis(
+ timestampDiff(unitParts._2, data._1.toDate, data._2.toDate),
+ s"timestampDiff(${unitParts._1}, '${data._1}'.toDate, '${data._2}'.toDate)",
+ s"TIMESTAMPDIFF(${unitParts._1}, DATE '${data._1}', DATE '${data._2}')",
+ data._3
+ )
+ testSqlApi( // sql tsi
+ s"TIMESTAMPDIFF(${unitParts._3}, DATE '${data._1}', DATE '${data._2}')",
+ data._3
+ )
+ }
+ }
+ }
+
+ testAllApis(
+ timestampDiff(TimePointUnit.DAY, Null(Types.SQL_TIMESTAMP),
+ "2016-02-24 12:42:25".toTimestamp),
+ "timestampDiff(DAY, Null(SQL_TIMESTAMP), '2016-02-24 12:42:25'.toTimestamp)",
+ "TIMESTAMPDIFF(DAY, CAST(NULL AS TIMESTAMP), TIMESTAMP '2016-02-24 12:42:25')",
+ "null"
+ )
+
+ testAllApis(
+ timestampDiff(TimePointUnit.DAY, "2016-02-24 12:42:25".toTimestamp,
+ Null(Types.SQL_TIMESTAMP)),
+ "timestampDiff(DAY, '2016-02-24 12:42:25'.toTimestamp, Null(SQL_TIMESTAMP))",
+ "TIMESTAMPDIFF(DAY, TIMESTAMP '2016-02-24 12:42:25', CAST(NULL AS TIMESTAMP))",
+ "null"
+ )
+ }
+
+ @Test
def testTimestampAdd(): Unit = {
val data = Seq(
(1, "2017-11-29 22:58:58.998"),
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala
index 05e4e9b..8a5691d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala
@@ -19,8 +19,9 @@
package org.apache.flink.table.expressions.validation
import org.apache.calcite.avatica.util.TimeUnit
-import org.apache.flink.table.api.{SqlParserException, ValidationException}
import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{SqlParserException, ValidationException}
+import org.apache.flink.table.expressions.TimePointUnit
import org.apache.flink.table.expressions.utils.ScalarTypesTestBase
import org.junit.Test
@@ -99,6 +100,18 @@ class ScalarFunctionsValidationTest extends ScalarTypesTestBase {
}
@Test(expected = classOf[ValidationException])
+ def testTimestampDiffWithWrongTime(): Unit = {
+ testTableApi(
+ timestampDiff(TimePointUnit.DAY, "2016-02-24", "2016-02-27"), "FAIL", "FAIL")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testTimestampDiffWithWrongTimeAndUnit(): Unit = {
+ testTableApi(
+ timestampDiff(TimePointUnit.MINUTE, "2016-02-24", "2016-02-27"), "FAIL", "FAIL")
+ }
+
+ @Test(expected = classOf[ValidationException])
def testDOWWithTimeWhichIsUnsupported(): Unit = {
testSqlApi("EXTRACT(DOW FROM TIME '12:42:25')", "0")
}