You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/09/25 17:13:35 UTC

[flink] branch master updated: [FLINK-6847] [FLINK-6813] [table] Add support for TIMESTAMPDIFF in Table API & SQL

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c5ce970  [FLINK-6847] [FLINK-6813] [table] Add support for TIMESTAMPDIFF in Table API & SQL
c5ce970 is described below

commit c5ce970e781df60eb27b62446853eaa0579c8706
Author: xueyu <27...@qq.com>
AuthorDate: Sun Jul 8 18:50:06 2018 +0800

    [FLINK-6847] [FLINK-6813] [table] Add support for TIMESTAMPDIFF in Table API & SQL
    
    This closes #6282.
---
 docs/dev/table/functions.md                        |  84 ++++++++++++++-
 .../flink/table/api/scala/expressionDsl.scala      |  29 +++++
 .../apache/flink/table/codegen/CodeGenerator.scala |   4 +-
 .../table/codegen/calls/FunctionGenerator.scala    |  24 +++++
 .../table/codegen/calls/ScalarOperators.scala      |  34 ++++++
 .../table/codegen/calls/TimestampDiffCallGen.scala | 118 +++++++++++++++++++++
 .../flink/table/expressions/ExpressionParser.scala |   7 ++
 .../flink/table/expressions/arithmetic.scala       |  21 ++--
 .../org/apache/flink/table/expressions/time.scala  |  54 ++++++++++
 .../flink/table/validate/FunctionCatalog.scala     |   2 +
 .../table/expressions/ScalarFunctionsTest.scala    | 115 ++++++++++++++++++++
 .../validation/ScalarFunctionsValidationTest.scala |  15 ++-
 12 files changed, 494 insertions(+), 13 deletions(-)

diff --git a/docs/dev/table/functions.md b/docs/dev/table/functions.md
index 85768ab..1108294 100644
--- a/docs/dev/table/functions.md
+++ b/docs/dev/table/functions.md
@@ -3311,14 +3311,27 @@ DATE_FORMAT(timestamp, string)
     <tr>
       <td>
         {% highlight text %}
-TIMESTAMPADD(unit, interval, timevalue)
+TIMESTAMPADD(timeintervalunit, interval, timepoint)
 {% endhighlight %}
       </td>
       <td>
-        <p>Returns a new time value that adds a (signed) integer interval to <i>timevalue</i>. The unit for <i>interval</i> is given by the unit argument, which should be one of the following values: <code>SECOND</code>, <code>MINUTE</code>, <code>HOUR</code>, <code>DAY</code>, <code>WEEK</code>, <code>MONTH</code>, <code>QUARTER</code>, or <code>YEAR</code>.</p> 
+        <p>Returns a new time value that adds a (signed) integer interval to <i>timepoint</i>. The unit for <i>interval</i> is given by the unit argument, which should be one of the following values: <code>SECOND</code>, <code>MINUTE</code>, <code>HOUR</code>, <code>DAY</code>, <code>WEEK</code>, <code>MONTH</code>, <code>QUARTER</code>, or <code>YEAR</code>.</p> 
         <p>E.g., <code>TIMESTAMPADD(WEEK, 1, DATE '2003-01-02')</code> returns <code>2003-01-09</code>.</p>
       </td>
     </tr>
+
+    <tr>
+      <td>
+        {% highlight text %}
+TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the (signed) number of <i>timepointunit</i> between <i>timepoint1</i> and <i>timepoint2</i>. The unit for the interval is given by the first argument, which should be one of the following values: <code>SECOND</code>, <code>MINUTE</code>, <code>HOUR</code>, <code>DAY</code>, <code>MONTH</code>, or <code>YEAR</code>. See also the <a href="#time-interval-and-point-unit-specifiers">Time Interval and Point Unit Specifiers table</a>.</p>
+        <p>E.g., <code>TIMESTAMPDIFF(DAY, TIMESTAMP '2003-01-02 10:00:00', TIMESTAMP '2003-01-03 10:00:00')</code> leads to <code>1</code>.</p>
+      </td>
+    </tr>
+
   </tbody>
 </table>
 </div>
@@ -3564,6 +3577,19 @@ dateFormat(TIMESTAMP, STRING)
         <p>E.g., <code>dateFormat(ts, '%Y, %d %M')</code> results in strings formatted as "2017, 05 May".</p>
       </td>
     </tr>
+
+    <tr>
+      <td>
+        {% highlight java %}
+timestampDiff(TIMEPOINTUNIT, TIMEPOINT1, TIMEPOINT2)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the (signed) number of <i>TIMEPOINTUNIT</i> between <i>TIMEPOINT1</i> and <i>TIMEPOINT2</i>. The unit for the interval is given by the first argument, which should be one of the following values: <code>SECOND</code>, <code>MINUTE</code>, <code>HOUR</code>, <code>DAY</code>, <code>MONTH</code>, or <code>YEAR</code>. See also the <a href="#time-interval-and-point-unit-specifiers">Time Interval and Point Unit Specifiers table</a>.</p>
+        <p>E.g., <code>timestampDiff(DAY, '2003-01-02 10:00:00'.toTimestamp, '2003-01-03 10:00:00'.toTimestamp)</code> leads to <code>1</code>.</p>
+      </td>
+    </tr>
+
     </tbody>
 </table>
 </div>
@@ -3809,6 +3835,19 @@ dateFormat(TIMESTAMP, STRING)
         <p>E.g., <code>dateFormat('ts, "%Y, %d %M")</code> results in strings formatted as "2017, 05 May".</p>
       </td>
     </tr>
+
+    <tr>
+      <td>
+        {% highlight scala %}
+timestampDiff(TIMEPOINTUNIT, TIMEPOINT1, TIMEPOINT2)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the (signed) number of <i>TIMEPOINTUNIT</i> between <i>TIMEPOINT1</i> and <i>TIMEPOINT2</i>. The unit for the interval is given by the first argument, which should be one of the following values: <code>SECOND</code>, <code>MINUTE</code>, <code>HOUR</code>, <code>DAY</code>, <code>MONTH</code>, or <code>YEAR</code>. See also the <a href="#time-interval-and-point-unit-specifiers">Time Interval and Point Unit Specifiers table</a>.</p>
+        <p>E.g., <code>timestampDiff(TimePointUnit.DAY, '2003-01-02 10:00:00'.toTimestamp, '2003-01-03 10:00:00'.toTimestamp)</code> leads to <code>1</code>.</p>
+      </td>
+    </tr>
+
   </tbody>
 </table>
 </div>
@@ -5463,3 +5502,44 @@ The following table lists specifiers for date format functions.
 </table>
 
 {% top %}
+
+Time Interval and Point Unit Specifiers
+---------------------------------------
+
+The following table lists specifiers for time interval and time point units. 
+
+For Table API, please use `_` for spaces (e.g., `DAY_TO_HOUR`).
+
+| Time Interval Unit       | Time Point Unit                |
+| :----------------------- | :----------------------------- |
+| `MILLENIUM` _(SQL-only)_ |                                |
+| `CENTURY` _(SQL-only)_   |                                |
+| `YEAR`                   | `YEAR`                         |
+| `YEAR TO MONTH`          |                                |
+| `QUARTER`                | `QUARTER`                      |
+| `MONTH`                  | `MONTH`                        |
+| `WEEK`                   | `WEEK`                         |
+| `DAY`                    | `DAY`                          |
+| `DAY TO HOUR`            |                                |
+| `DAY TO MINUTE`          |                                |
+| `DAY TO SECOND`          |                                |
+| `HOUR`                   | `HOUR`                         |
+| `HOUR TO MINUTE`         |                                |
+| `HOUR TO SECOND`         |                                |
+| `MINUTE`                 | `MINUTE`                       |
+| `MINUTE TO SECOND`       |                                |
+| `SECOND`                 | `SECOND`                       |
+|                          | `MILLISECOND`                  |
+|                          | `MICROSECOND`                  |
+| `DOY` _(SQL-only)_       |                                |
+| `DOW` _(SQL-only)_       |                                |
+|                          | `SQL_TSI_YEAR` _(SQL-only)_    |
+|                          | `SQL_TSI_QUARTER` _(SQL-only)_ |
+|                          | `SQL_TSI_MONTH` _(SQL-only)_   |
+|                          | `SQL_TSI_WEEK` _(SQL-only)_    |
+|                          | `SQL_TSI_DAY` _(SQL-only)_     |
+|                          | `SQL_TSI_HOUR` _(SQL-only)_    |
+|                          | `SQL_TSI_MINUTE` _(SQL-only)_  |
+|                          | `SQL_TSI_SECOND ` _(SQL-only)_ |
+
+{% top %}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index 626012c..14adb71 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -26,6 +26,7 @@ import org.apache.flink.table.api.{CurrentRange, CurrentRow, TableException, Unb
 import org.apache.flink.table.expressions.ExpressionUtils.{convertArray, toMilliInterval, toMonthInterval, toRowInterval}
 import org.apache.flink.table.api.Table
 import org.apache.flink.table.expressions.TimeIntervalUnit.TimeIntervalUnit
+import org.apache.flink.table.expressions.TimePointUnit.TimePointUnit
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.functions.{AggregateFunction, DistinctAggregateFunction}
 
@@ -1142,6 +1143,34 @@ object dateFormat {
 }
 
 /**
+  * Returns the (signed) number of [[TimePointUnit]] between timePoint1 and timePoint2.
+  *
+  * For example, timestampDiff(TimePointUnit.DAY, '2016-06-15'.toDate, '2016-06-18'.toDate leads
+  * to 3.
+  */
+object timestampDiff {
+
+  /**
+    * Returns the (signed) number of [[TimePointUnit]] between timePoint1 and timePoint2.
+    *
+    * For example, timestampDiff(TimePointUnit.DAY, '2016-06-15'.toDate, '2016-06-18'.toDate leads
+    * to 3.
+    *
+    * @param timePointUnit The unit to compute diff.
+    * @param timePoint1 The first point in time.
+    * @param timePoint2 The second point in time.
+    * @return The number of intervals as integer value.
+    */
+  def apply(
+      timePointUnit: TimePointUnit,
+      timePoint1: Expression,
+      timePoint2: Expression)
+    : Expression = {
+    TimestampDiff(timePointUnit, timePoint1, timePoint2)
+  }
+}
+
+/**
   * Creates an array of literals. The array will be an array of objects (not primitives).
   */
 object array {
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index 0fdd885..19e030b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -769,7 +769,7 @@ abstract class CodeGenerator(
         val right = operands(1)
         requireTemporal(left)
         requireTemporal(right)
-        generateTemporalPlusMinus(plus = true, nullCheck, left, right, config)
+        generateTemporalPlusMinus(plus = true, nullCheck, resultType, left, right, config)
 
       case MINUS if isNumeric(resultType) =>
         val left = operands.head
@@ -783,7 +783,7 @@ abstract class CodeGenerator(
         val right = operands(1)
         requireTemporal(left)
         requireTemporal(right)
-        generateTemporalPlusMinus(plus = false, nullCheck, left, right, config)
+        generateTemporalPlusMinus(plus = false, nullCheck, resultType, left, right, config)
 
       case MULTIPLY if isNumeric(resultType) =>
         val left = operands.head
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index b6eb3e8..e320052 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.codegen.calls
 
 import java.lang.reflect.Method
 
+import org.apache.calcite.avatica.util.TimeUnit
 import org.apache.calcite.avatica.util.TimeUnitRange
 import org.apache.calcite.sql.SqlOperator
 import org.apache.calcite.sql.fun.SqlStdOperatorTable._
@@ -532,6 +533,29 @@ object FunctionGenerator {
     new ExtractCallGen(LONG_TYPE_INFO, BuiltInMethod.UNIX_DATE_EXTRACT.method))
 
   addSqlFunction(
+    TIMESTAMP_DIFF,
+    Seq(
+      new GenericTypeInfo(classOf[TimeUnit]),
+      SqlTimeTypeInfo.TIMESTAMP,
+      SqlTimeTypeInfo.TIMESTAMP),
+    new TimestampDiffCallGen)
+
+  addSqlFunction(
+    TIMESTAMP_DIFF,
+    Seq(new GenericTypeInfo(classOf[TimeUnit]), SqlTimeTypeInfo.TIMESTAMP, SqlTimeTypeInfo.DATE),
+    new TimestampDiffCallGen)
+
+  addSqlFunction(
+    TIMESTAMP_DIFF,
+    Seq(new GenericTypeInfo(classOf[TimeUnit]), SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.TIMESTAMP),
+    new TimestampDiffCallGen)
+
+  addSqlFunction(
+    TIMESTAMP_DIFF,
+    Seq(new GenericTypeInfo(classOf[TimeUnit]), SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.DATE),
+    new TimestampDiffCallGen)
+
+  addSqlFunction(
     FLOOR,
     Seq(SqlTimeTypeInfo.DATE, new GenericTypeInfo(classOf[TimeUnitRange])),
     new FloorCeilCallGen(
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
index 57f1618..282b167 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
@@ -825,6 +825,7 @@ object ScalarOperators {
   def generateTemporalPlusMinus(
       plus: Boolean,
       nullCheck: Boolean,
+      resultType: TypeInformation[_],
       left: GeneratedExpression,
       right: GeneratedExpression,
       config: TableConfig)
@@ -833,6 +834,7 @@ object ScalarOperators {
     val op = if (plus) "+" else "-"
 
     (left.resultType, right.resultType) match {
+      // arithmetic of time point and time interval
       case (l: TimeIntervalTypeInfo[_], r: TimeIntervalTypeInfo[_]) if l == r =>
         generateArithmeticOperator(op, nullCheck, l, left, right, config)
 
@@ -861,6 +863,38 @@ object ScalarOperators {
           (l, r) => s"${qualifyMethod(BuiltInMethod.ADD_MONTHS.method)}($l, $op($r))"
         }
 
+      // minus arithmetic of time points (i.e. for TIMESTAMPDIFF)
+      case (l: SqlTimeTypeInfo[_], r: SqlTimeTypeInfo[_]) if !plus =>
+        resultType match {
+          case TimeIntervalTypeInfo.INTERVAL_MONTHS =>
+            generateOperatorIfNotNull(nullCheck, resultType, left, right) {
+              (ll, rr) => (l, r) match {
+                case (SqlTimeTypeInfo.TIMESTAMP, SqlTimeTypeInfo.DATE) =>
+                  s"${qualifyMethod(BuiltInMethod.SUBTRACT_MONTHS.method)}" +
+                    s"($ll, $rr * ${MILLIS_PER_DAY}L)"
+                case (SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.TIMESTAMP) =>
+                  s"${qualifyMethod(BuiltInMethod.SUBTRACT_MONTHS.method)}" +
+                    s"($ll * ${MILLIS_PER_DAY}L, $rr)"
+                case _ =>
+                  s"${qualifyMethod(BuiltInMethod.SUBTRACT_MONTHS.method)}($ll, $rr)"
+               }
+            }
+
+          case TimeIntervalTypeInfo.INTERVAL_MILLIS =>
+            generateOperatorIfNotNull(nullCheck, resultType, left, right) {
+              (ll, rr) => (l, r) match {
+                case (SqlTimeTypeInfo.TIMESTAMP, SqlTimeTypeInfo.TIMESTAMP) =>
+                  s"$ll $op $rr"
+                case (SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.DATE) =>
+                  s"($ll * ${MILLIS_PER_DAY}L) $op ($rr * ${MILLIS_PER_DAY}L)"
+                case (SqlTimeTypeInfo.TIMESTAMP, SqlTimeTypeInfo.DATE) =>
+                  s"$ll $op ($rr * ${MILLIS_PER_DAY}L)"
+                case (SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.TIMESTAMP) =>
+                  s"($ll * ${MILLIS_PER_DAY}L) $op $rr"
+              }
+            }
+        }
+
       case _ =>
         throw new CodeGenException("Unsupported temporal arithmetic.")
     }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TimestampDiffCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TimestampDiffCallGen.scala
new file mode 100644
index 0000000..5a7cd4f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TimestampDiffCallGen.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen.calls
+
+import org.apache.calcite.avatica.util.DateTimeUtils.MILLIS_PER_DAY
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.util.BuiltInMethod
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
+import org.apache.flink.table.codegen.CodeGenUtils._
+import org.apache.flink.table.codegen.calls.CallGenerator.generateCallIfArgsNotNull
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, GeneratedExpression}
+
+class TimestampDiffCallGen extends CallGenerator {
+
+  override def generate(
+      codeGenerator: CodeGenerator,
+      operands: Seq[GeneratedExpression])
+    : GeneratedExpression = {
+
+    val unit = getEnum(operands.head).asInstanceOf[TimeUnit]
+    unit match {
+      case TimeUnit.YEAR |
+           TimeUnit.MONTH |
+           TimeUnit.QUARTER =>
+        (operands(1).resultType, operands(2).resultType) match {
+          case (SqlTimeTypeInfo.TIMESTAMP, SqlTimeTypeInfo.DATE) =>
+            generateCallIfArgsNotNull(codeGenerator.nullCheck, INT_TYPE_INFO, operands) {
+              (terms) =>
+                s"""
+                  |${qualifyMethod(BuiltInMethod.SUBTRACT_MONTHS.method)}(${terms(1)},
+                  |  ${terms(2)} * ${MILLIS_PER_DAY}L) / ${unit.multiplier.intValue()}
+                  |""".stripMargin
+            }
+
+          case (SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.TIMESTAMP) =>
+            generateCallIfArgsNotNull(codeGenerator.nullCheck, INT_TYPE_INFO, operands) {
+              (terms) =>
+                s"""
+                  |${qualifyMethod(BuiltInMethod.SUBTRACT_MONTHS.method)}(
+                  |${terms(1)} * ${MILLIS_PER_DAY}L, ${terms(2)}) / ${unit.multiplier.intValue()}
+                  |""".stripMargin
+            }
+
+          case _ =>
+            generateCallIfArgsNotNull(codeGenerator.nullCheck, INT_TYPE_INFO, operands) {
+              (terms) =>
+                s"""
+                  |${qualifyMethod(BuiltInMethod.SUBTRACT_MONTHS.method)}(${terms(1)},
+                  |  ${terms(2)}) / ${unit.multiplier.intValue()}
+                  |""".stripMargin
+            }
+        }
+
+      case TimeUnit.WEEK |
+           TimeUnit.DAY |
+           TimeUnit.HOUR |
+           TimeUnit.MINUTE |
+           TimeUnit.SECOND =>
+        (operands(1).resultType, operands(2).resultType) match {
+          case (SqlTimeTypeInfo.TIMESTAMP, SqlTimeTypeInfo.TIMESTAMP) =>
+            generateCallIfArgsNotNull(codeGenerator.nullCheck, INT_TYPE_INFO, operands) {
+              (terms) =>
+                s"""
+                  |(int)((${terms(1)} - ${terms(2)}) / ${unit.multiplier.intValue()})
+                  |""".stripMargin
+            }
+
+          case (SqlTimeTypeInfo.TIMESTAMP, SqlTimeTypeInfo.DATE) =>
+            generateCallIfArgsNotNull(codeGenerator.nullCheck, INT_TYPE_INFO, operands) {
+              (terms) =>
+                s"""
+                  |(int)((${terms(1)} -
+                  |  ${terms(2)} * ${MILLIS_PER_DAY}L) / ${unit.multiplier.intValue()})
+                  |""".stripMargin
+            }
+
+          case (SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.TIMESTAMP) =>
+            generateCallIfArgsNotNull(codeGenerator.nullCheck, INT_TYPE_INFO, operands) {
+              (terms) =>
+                s"""
+                  |(int)((${terms(1)} * ${MILLIS_PER_DAY}L -
+                  |  ${terms(2)}) / ${unit.multiplier.intValue()})
+                  |""".stripMargin
+            }
+
+          case (SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.DATE) =>
+            generateCallIfArgsNotNull(codeGenerator.nullCheck, INT_TYPE_INFO, operands) {
+              (terms) =>
+                s"""
+                  |(int)((${terms(1)} * ${MILLIS_PER_DAY}L -
+                  |  ${terms(2)} * ${MILLIS_PER_DAY}L) / ${unit.multiplier.intValue()})
+                  |""".stripMargin
+            }
+        }
+
+      case _ =>
+        throw new CodeGenException(
+          "Unit '" + unit + "' can not be applied to the timestamp difference function.")
+     }
+  }
+}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
index c82d556..0b84f94 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
@@ -59,6 +59,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
   lazy val TO_TIMESTAMP: Keyword = Keyword("toTimestamp")
   lazy val TRIM: Keyword = Keyword("trim")
   lazy val EXTRACT: Keyword = Keyword("extract")
+  lazy val TIMESTAMP_DIFF: Keyword = Keyword("timestampDiff")
   lazy val FLOOR: Keyword = Keyword("floor")
   lazy val CEIL: Keyword = Keyword("ceil")
   lazy val LOG: Keyword = Keyword("log")
@@ -392,6 +393,11 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
       case _ ~ _ ~ operand ~ _ ~ unit ~ _ => Extract(unit, operand)
     }
 
+  lazy val prefixTimestampDiff: PackratParser[Expression] =
+    TIMESTAMP_DIFF ~ "(" ~ timePointUnit ~ "," ~ expression ~ "," ~ expression ~ ")" ^^ {
+      case _ ~ _ ~ unit ~ _ ~ operand1 ~ _ ~ operand2 ~ _ => TimestampDiff(unit, operand1, operand2)
+    }
+
   lazy val prefixFloor: PackratParser[Expression] =
     FLOOR ~ "(" ~ expression ~ "," ~ timeIntervalUnit ~ ")" ^^ {
       case _ ~ _ ~ operand ~ _ ~ unit ~ _ => TemporalFloor(unit, operand)
@@ -437,6 +443,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
     prefixAs| prefixToTimestamp | prefixToTime | prefixToDate |
     // expressions that take enumerations
     prefixCast | prefixTrim | prefixTrimWithoutArgs | prefixExtract | prefixFloor | prefixCeil |
+      prefixTimestampDiff |
     // expressions that take literals
     prefixGet |
     // expression with special identifier
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/arithmetic.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/arithmetic.scala
index 28049e8..3dc8899 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/arithmetic.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/arithmetic.scala
@@ -42,11 +42,10 @@ abstract class BinaryArithmetic extends BinaryExpression {
         throw new RuntimeException("This should never happen.")
     }
 
-  // TODO: tighten this rule once we implemented type coercion rules during validation
   override private[flink] def validateInput(): ValidationResult = {
     if (!isNumeric(left.resultType) || !isNumeric(right.resultType)) {
-      ValidationFailure(s"$this requires both operands Numeric, get " +
-        s"$left : ${left.resultType} and $right : ${right.resultType}")
+      ValidationFailure(s"The arithmetic '$this' requires both operands to be numeric, but was " +
+        s"'$left' : '${left.resultType}' and '$right' : '${right.resultType}'.")
     } else {
       ValidationSuccess
     }
@@ -93,9 +92,9 @@ case class Plus(left: Expression, right: Expression) extends BinaryArithmetic {
       ValidationSuccess
     } else {
       ValidationFailure(
-        s"$this requires Numeric, String, Intervals of same type, " +
-        s"or Interval and a time point input, " +
-        s"get $left : ${left.resultType} and $right : ${right.resultType}")
+        s"The arithmetic '$this' requires input that is numeric, string, time intervals of the " +
+        s"same type, or a time interval and a time point type, " +
+        s"but was '$left' : '${left.resultType}' and '$right' : '${right.resultType}'.")
     }
   }
 }
@@ -115,7 +114,8 @@ case class UnaryMinus(child: Expression) extends UnaryExpression {
     } else if (isTimeInterval(child.resultType)) {
       ValidationSuccess
     } else {
-      ValidationFailure(s"$this requires Numeric, or Interval input, get ${child.resultType}")
+      ValidationFailure(s"The arithmetic '$this' requires input that is numeric or a time " +
+        s"interval type, but was '${child.resultType}'.")
     }
   }
 }
@@ -132,8 +132,13 @@ case class Minus(left: Expression, right: Expression) extends BinaryArithmetic {
       ValidationSuccess
     } else if (isTimeInterval(left.resultType) && isTimePoint(right.resultType)) {
       ValidationSuccess
+    } else if (isNumeric(left.resultType) && isNumeric(right.resultType)) {
+      ValidationSuccess
     } else {
-      super.validateInput()
+      ValidationFailure(
+        s"The arithmetic '$this' requires inputs that are numeric, time intervals of the same " +
+        s"type, or a time interval and a time point type, " +
+        s"but was '$left' : '${left.resultType}' and '$right' : '${right.resultType}'.")
     }
   }
 }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
index ac996f6..b17537e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
@@ -340,3 +340,57 @@ case class DateFormat(timestamp: Expression, format: Expression) extends Express
 
   override private[flink] def resultType = STRING_TYPE_INFO
 }
+
+case class TimestampDiff(
+    timePointUnit: Expression,
+    timePoint1: Expression,
+    timePoint2: Expression)
+  extends Expression {
+
+  override private[flink] def children: Seq[Expression] =
+    timePointUnit :: timePoint1 :: timePoint2 :: Nil
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (!TypeCheckUtils.isTimePoint(timePoint1.resultType)) {
+      return ValidationFailure(
+        s"$this requires an input time point type, " +
+        s"but timePoint1 is of type '${timePoint1.resultType}'.")
+    }
+
+    if (!TypeCheckUtils.isTimePoint(timePoint2.resultType)) {
+      return ValidationFailure(
+        s"$this requires an input time point type, " +
+        s"but timePoint2 is of type '${timePoint2.resultType}'.")
+    }
+
+    timePointUnit match {
+      case SymbolExpression(TimePointUnit.YEAR)
+           | SymbolExpression(TimePointUnit.QUARTER)
+           | SymbolExpression(TimePointUnit.MONTH)
+           | SymbolExpression(TimePointUnit.WEEK)
+           | SymbolExpression(TimePointUnit.DAY)
+           | SymbolExpression(TimePointUnit.HOUR)
+           | SymbolExpression(TimePointUnit.MINUTE)
+           | SymbolExpression(TimePointUnit.SECOND)
+        if timePoint1.resultType == SqlTimeTypeInfo.DATE
+          || timePoint1.resultType == SqlTimeTypeInfo.TIMESTAMP
+          || timePoint2.resultType == SqlTimeTypeInfo.DATE
+          || timePoint2.resultType == SqlTimeTypeInfo.TIMESTAMP =>
+        ValidationSuccess
+
+      case _ =>
+        ValidationFailure(s"$this operator does not support unit '$timePointUnit'" +
+            s" for input of type ('${timePoint1.resultType}', '${timePoint2.resultType}').")
+    }
+  }
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder
+    .getRexBuilder
+    .makeCall(SqlStdOperatorTable.TIMESTAMP_DIFF,
+       Seq(timePointUnit.toRexNode, timePoint2.toRexNode, timePoint1.toRexNode))
+  }
+
+  override def toString: String = s"timestampDiff(${children.mkString(", ")})"
+
+  override private[flink] def resultType = INT_TYPE_INFO
+}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index c60f979..801d2e9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -257,6 +257,7 @@ object FunctionCatalog {
     "temporalOverlaps" -> classOf[TemporalOverlaps],
     "dateTimePlus" -> classOf[Plus],
     "dateFormat" -> classOf[DateFormat],
+    "timestampDiff" -> classOf[TimestampDiff],
 
     // item
     "at" -> classOf[ItemAt],
@@ -446,6 +447,7 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
     ScalarSqlFunctions.BIN,
     ScalarSqlFunctions.HEX,
     SqlStdOperatorTable.TIMESTAMP_ADD,
+    SqlStdOperatorTable.TIMESTAMP_DIFF,
     ScalarSqlFunctions.LOG,
     ScalarSqlFunctions.LPAD,
     ScalarSqlFunctions.RPAD,
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index e8708cb..7614f19 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -2248,6 +2248,121 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   }
 
   @Test
+  def testTimestampDiff(): Unit = {
+    val dataMap = Map(
+      ("DAY", TimePointUnit.DAY, "SQL_TSI_DAY") -> Seq(
+        ("2018-07-03 11:11:11", "2018-07-05 11:11:11", "2"), // timestamp, timestamp
+        ("2016-06-15", "2016-06-16 11:11:11", "1"), // date, timestamp
+        ("2016-06-15 11:00:00", "2016-06-19", "3"), // timestamp, date
+        ("2016-06-15", "2016-06-18", "3") // date, date
+      ),
+      ("HOUR", TimePointUnit.HOUR, "SQL_TSI_HOUR") -> Seq(
+        ("2018-07-03 11:11:11", "2018-07-04 12:12:11", "25"),
+        ("2016-06-15", "2016-06-16 11:11:11", "35"),
+        ("2016-06-15 11:00:00", "2016-06-19", "85"),
+        ("2016-06-15", "2016-06-12", "-72")
+      ),
+      ("MINUTE", TimePointUnit.MINUTE, "SQL_TSI_MINUTE") -> Seq(
+        ("2018-07-03 11:11:11", "2018-07-03 12:10:11", "59"),
+        ("2016-06-15", "2016-06-16 11:11:11", "2111"),
+        ("2016-06-15 11:00:00", "2016-06-19", "5100"),
+        ("2016-06-15", "2016-06-18", "4320")
+      ),
+      ("SECOND", TimePointUnit.SECOND, "SQL_TSI_SECOND") -> Seq(
+        ("2018-07-03 11:11:11", "2018-07-03 11:12:12", "61"),
+        ("2016-06-15", "2016-06-16 11:11:11", "126671"),
+        ("2016-06-15 11:00:00", "2016-06-19", "306000"),
+        ("2016-06-15", "2016-06-18", "259200")
+      ),
+      ("WEEK", TimePointUnit.WEEK, "SQL_TSI_WEEK") -> Seq(
+        ("2018-05-03 11:11:11", "2018-07-03 11:12:12", "8"),
+        ("2016-04-15", "2016-07-16 11:11:11", "13"),
+        ("2016-04-15 11:00:00", "2016-09-19", "22"),
+        ("2016-08-15", "2016-06-18", "-8")
+      ),
+      ("MONTH", TimePointUnit.MONTH, "SQL_TSI_MONTH") -> Seq(
+        ("2018-07-03 11:11:11", "2018-09-05 11:11:11", "2"),
+        ("2016-06-15", "2018-06-16 11:11:11", "24"),
+        ("2016-06-15 11:00:00", "2018-05-19", "23"),
+        ("2016-06-15", "2018-03-18", "21")
+      ),
+      ("QUARTER", TimePointUnit.QUARTER, "SQL_TSI_QUARTER") -> Seq(
+        ("2018-01-03 11:11:11", "2018-09-05 11:11:11", "2"),
+        ("2016-06-15", "2018-06-16 11:11:11", "8"),
+        ("2016-06-15 11:00:00", "2018-05-19", "7"),
+        ("2016-06-15", "2018-03-18", "7")
+      )
+    )
+
+    for ((unitParts, dataParts) <- dataMap) {
+      for ((data,index) <- dataParts.zipWithIndex) {
+        index match {
+          case 0 => // timestamp, timestamp
+            testAllApis(
+              timestampDiff(unitParts._2, data._1.toTimestamp, data._2.toTimestamp),
+              s"timestampDiff(${unitParts._1}, '${data._1}'.toTimestamp, '${data._2}'.toTimestamp)",
+              s"TIMESTAMPDIFF(${unitParts._1}, TIMESTAMP '${data._1}', TIMESTAMP '${data._2}')",
+              data._3
+            )
+            testSqlApi(  // sql tsi
+              s"TIMESTAMPDIFF(${unitParts._3}, TIMESTAMP '${data._1}', TIMESTAMP '${data._2}')",
+              data._3
+            )
+          case 1 => // date, timestamp
+            testAllApis(
+              timestampDiff(unitParts._2, data._1.toDate, data._2.toTimestamp),
+              s"timestampDiff(${unitParts._1}, '${data._1}'.toDate, '${data._2}'.toTimestamp)",
+              s"TIMESTAMPDIFF(${unitParts._1}, DATE '${data._1}', TIMESTAMP '${data._2}')",
+              data._3
+            )
+            testSqlApi( // sql tsi
+              s"TIMESTAMPDIFF(${unitParts._3}, DATE '${data._1}', TIMESTAMP '${data._2}')",
+              data._3
+            )
+          case 2 => // timestamp, date
+            testAllApis(
+              timestampDiff(unitParts._2, data._1.toTimestamp, data._2.toDate),
+              s"timestampDiff(${unitParts._1}, '${data._1}'.toTimestamp, '${data._2}'.toDate)",
+              s"TIMESTAMPDIFF(${unitParts._1}, TIMESTAMP '${data._1}', DATE '${data._2}')",
+              data._3
+            )
+            testSqlApi( // sql tsi
+              s"TIMESTAMPDIFF(${unitParts._3}, TIMESTAMP '${data._1}', DATE '${data._2}')",
+              data._3
+            )
+          case 3 => // date, date
+            testAllApis(
+              timestampDiff(unitParts._2, data._1.toDate, data._2.toDate),
+              s"timestampDiff(${unitParts._1}, '${data._1}'.toDate, '${data._2}'.toDate)",
+              s"TIMESTAMPDIFF(${unitParts._1}, DATE '${data._1}', DATE '${data._2}')",
+              data._3
+            )
+            testSqlApi( // sql tsi
+              s"TIMESTAMPDIFF(${unitParts._3}, DATE '${data._1}', DATE '${data._2}')",
+              data._3
+            )
+        }
+      }
+    }
+
+    testAllApis(
+      timestampDiff(TimePointUnit.DAY, Null(Types.SQL_TIMESTAMP),
+        "2016-02-24 12:42:25".toTimestamp),
+      "timestampDiff(DAY, Null(SQL_TIMESTAMP), '2016-02-24 12:42:25'.toTimestamp)",
+      "TIMESTAMPDIFF(DAY, CAST(NULL AS TIMESTAMP), TIMESTAMP '2016-02-24 12:42:25')",
+      "null"
+    )
+
+    testAllApis(
+      timestampDiff(TimePointUnit.DAY, "2016-02-24 12:42:25".toTimestamp,
+        Null(Types.SQL_TIMESTAMP)),
+      "timestampDiff(DAY, '2016-02-24 12:42:25'.toTimestamp,  Null(SQL_TIMESTAMP))",
+      "TIMESTAMPDIFF(DAY, TIMESTAMP '2016-02-24 12:42:25',  CAST(NULL AS TIMESTAMP))",
+      "null"
+    )
+  }
+
+  @Test
   def testTimestampAdd(): Unit = {
     val data = Seq(
       (1, "2017-11-29 22:58:58.998"),
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala
index 05e4e9b..8a5691d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala
@@ -19,8 +19,9 @@
 package org.apache.flink.table.expressions.validation
 
 import org.apache.calcite.avatica.util.TimeUnit
-import org.apache.flink.table.api.{SqlParserException, ValidationException}
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{SqlParserException, ValidationException}
+import org.apache.flink.table.expressions.TimePointUnit
 import org.apache.flink.table.expressions.utils.ScalarTypesTestBase
 import org.junit.Test
 
@@ -99,6 +100,18 @@ class ScalarFunctionsValidationTest extends ScalarTypesTestBase {
   }
 
   @Test(expected = classOf[ValidationException])
+  def testTimestampDiffWithWrongTime(): Unit = {
+    testTableApi(
+      timestampDiff(TimePointUnit.DAY, "2016-02-24", "2016-02-27"), "FAIL", "FAIL")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testTimestampDiffWithWrongTimeAndUnit(): Unit = {
+    testTableApi(
+      timestampDiff(TimePointUnit.MINUTE, "2016-02-24", "2016-02-27"), "FAIL", "FAIL")
+  }
+
+  @Test(expected = classOf[ValidationException])
   def testDOWWithTimeWhichIsUnsupported(): Unit = {
     testSqlApi("EXTRACT(DOW FROM TIME '12:42:25')", "0")
   }