You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/08/26 14:21:53 UTC

flink git commit: [FLINK-3580] [table] Implement FLOOR/CEIL for time points

Repository: flink
Updated Branches:
  refs/heads/master b05ea6939 -> 6a456c673


[FLINK-3580] [table] Implement FLOOR/CEIL for time points

This closes #2391.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6a456c67
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6a456c67
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6a456c67

Branch: refs/heads/master
Commit: 6a456c67316a5e8ad3256e4cbfe37397b0c87282
Parents: b05ea69
Author: twalthr <tw...@apache.org>
Authored: Fri Aug 19 12:18:49 2016 +0200
Committer: twalthr <tw...@apache.org>
Committed: Fri Aug 26 16:19:59 2016 +0200

----------------------------------------------------------------------
 docs/dev/table_api.md                           |  66 ++++++++++
 .../flink/api/scala/table/expressionDsl.scala   |  20 ++-
 .../flink/api/table/codegen/CodeGenUtils.scala  |  13 +-
 .../table/codegen/calls/FloorCeilCallGen.scala  |  53 ++++++--
 .../table/codegen/calls/ScalarFunctions.scala   |  44 ++++++-
 .../table/expressions/ExpressionParser.scala    |  24 +++-
 .../flink/api/table/expressions/time.scala      |  70 ++++++++++-
 .../api/table/validate/FunctionCatalog.scala    |   3 +
 .../table/expressions/ScalarFunctionsTest.scala | 123 +++++++++++++++++++
 9 files changed, 396 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6a456c67/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index cdd3667..7a20e6a 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -1442,6 +1442,28 @@ TEMPORAL.extract(TIMEINTERVALUNIT)
       </td>
     </tr>
 
+    <tr>
+      <td>
+        {% highlight java %}
+TIMEPOINT.floor(TIMEINTERVALUNIT)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Rounds a time point down to the given unit. E.g. <code>"12:44:31".toDate.floor(MINUTE)</code> leads to 12:44:00.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight java %}
+TIMEPOINT.ceil(TIMEINTERVALUNIT)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Rounds a time point up to the given unit. E.g. <code>"12:44:31".toTime.floor(MINUTE)</code> leads to 12:45:00.</p>
+      </td>
+    </tr>
+
   </tbody>
 </table>
 
@@ -1683,6 +1705,28 @@ TEMPORAL.extract(TimeIntervalUnit)
       </td>
     </tr>
 
+    <tr>
+      <td>
+        {% highlight scala %}
+TIMEPOINT.floor(TimeIntervalUnit)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Rounds a time point down to the given unit. E.g. <code>"12:44:31".toTime.floor(TimeIntervalUnit.MINUTE)</code> leads to 12:44:00.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight scala %}
+TIMEPOINT.ceil(TimeIntervalUnit)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Rounds a time point up to the given unit. E.g. <code>"12:44:31".toTime.floor(TimeIntervalUnit.MINUTE)</code> leads to 12:45:00.</p>
+      </td>
+    </tr>
+
   </tbody>
 </table>
 </div>
@@ -1926,6 +1970,28 @@ EXTRACT(TIMEINTERVALUNIT FROM TEMPORAL)
       </td>
     </tr>
 
+    <tr>
+      <td>
+        {% highlight sql %}
+FLOOR(TIMEPOINT TO TIMEINTERVALUNIT)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Rounds a time point down to the given unit. E.g. <code>FLOOR(TIME '12:44:31' TO MINUTE)</code> leads to 12:44:00.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight sql %}
+CEIL(TIMEPOINT TO TIMEINTERVALUNIT)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Rounds a time point up to the given unit. E.g. <code>CEIL(TIME '12:44:31' TO MINUTE)</code> leads to 12:45:00.</p>
+      </td>
+    </tr>
+
   </tbody>
 </table>
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/6a456c67/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
index 92c61a3..b14ca88 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
@@ -137,6 +137,8 @@ trait ImplicitExpressionOperations {
     */
   def ceil() = Ceil(expr)
 
+  // String operations
+
   /**
     * Creates a substring of the given string at given index for a given length.
     *
@@ -216,6 +218,8 @@ trait ImplicitExpressionOperations {
     */
   def similar(pattern: Expression) = Similar(expr, pattern)
 
+  // Temporal operations
+
   /**
     * Parses a date String in the form "yy-mm-dd" to a SQL Date.
     */
@@ -238,7 +242,21 @@ trait ImplicitExpressionOperations {
     */
   def extract(timeIntervalUnit: TimeIntervalUnit) = Extract(timeIntervalUnit, expr)
 
-  // interval types
+  /**
+    * Rounds down a time point to the given unit.
+    *
+    * e.g. "12:44:31".toDate.floor(MINUTE) leads to 12:44:00
+    */
+  def floor(timeIntervalUnit: TimeIntervalUnit) = TemporalFloor(timeIntervalUnit, expr)
+
+  /**
+    * Rounds up a time point to the given unit.
+    *
+    * e.g. "12:44:31".toDate.ceil(MINUTE) leads to 12:45:00
+    */
+  def ceil(timeIntervalUnit: TimeIntervalUnit) = TemporalCeil(timeIntervalUnit, expr)
+
+  // Interval types
 
   /**
     * Creates an interval of the given number of years.

http://git-wip-us.apache.org/repos/asf/flink/blob/6a456c67/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
index 170af54..76f9b02 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
@@ -143,8 +143,17 @@ object CodeGenUtils {
         s"${qualifyMethod(BuiltInMethod.TIMESTAMP_TO_LONG.method)}($resultTerm)"
     }
 
-  def compareEnum(term: String, enum: Enum[_]): Boolean =
-    term == qualifyEnum(enum)
+  def compareEnum(term: String, enum: Enum[_]): Boolean = term == qualifyEnum(enum)
+
+  def getEnum(genExpr: GeneratedExpression): Enum[_] = {
+    val split = genExpr.resultTerm.split('.')
+    val value = split.last
+    val clazz = genExpr.resultType.getTypeClass
+    enumValueOf(clazz, value)
+  }
+
+  def enumValueOf[T <: Enum[T]](cls: Class[_], stringValue: String): Enum[_] =
+    Enum.valueOf(cls.asInstanceOf[Class[T]], stringValue).asInstanceOf[Enum[_]]
 
 
   // ----------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/6a456c67/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/FloorCeilCallGen.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/FloorCeilCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/FloorCeilCallGen.scala
index 84f60a0..d41e9a7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/FloorCeilCallGen.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/FloorCeilCallGen.scala
@@ -20,25 +20,54 @@ package org.apache.flink.api.table.codegen.calls
 
 import java.lang.reflect.Method
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo.
-  {DOUBLE_TYPE_INFO, FLOAT_TYPE_INFO,BIG_DEC_TYPE_INFO}
+import org.apache.calcite.avatica.util.TimeUnitRange
+import org.apache.calcite.avatica.util.TimeUnitRange.{MONTH, YEAR}
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{BIG_DEC_TYPE_INFO, DOUBLE_TYPE_INFO, FLOAT_TYPE_INFO}
+import org.apache.flink.api.table.codegen.CodeGenUtils.{getEnum, primitiveTypeTermForTypeInfo, qualifyMethod}
+import org.apache.flink.api.table.codegen.calls.CallGenerator.generateCallIfArgsNotNull
 import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedExpression}
 
 /**
-  * Generates arithmetic floor/ceil function calls.
+  * Generates floor/ceil function calls.
   */
-class FloorCeilCallGen(method: Method) extends MultiTypeMethodCallGen(method) {
+class FloorCeilCallGen(
+    arithmeticMethod: Method,
+    temporalMethod: Option[Method] = None)
+  extends MultiTypeMethodCallGen(arithmeticMethod) {
 
   override def generate(
       codeGenerator: CodeGenerator,
       operands: Seq[GeneratedExpression])
-    : GeneratedExpression = {
-    operands.head.resultType match {
-      case FLOAT_TYPE_INFO | DOUBLE_TYPE_INFO | BIG_DEC_TYPE_INFO =>
-        super.generate(codeGenerator, operands)
-      case _ =>
-        operands.head // no floor/ceil necessary
-    }
-  }
+    : GeneratedExpression = operands.size match {
+    // arithmetic
+    case 1 =>
+      operands.head.resultType match {
+        case FLOAT_TYPE_INFO | DOUBLE_TYPE_INFO | BIG_DEC_TYPE_INFO =>
+          super.generate(codeGenerator, operands)
+        case _ =>
+          operands.head // no floor/ceil necessary
+      }
+
+    // temporal
+    case 2 =>
+      val operand = operands.head
+      val unit = getEnum(operands(1)).asInstanceOf[TimeUnitRange]
+      val internalType = primitiveTypeTermForTypeInfo(operand.resultType)
 
+      generateCallIfArgsNotNull(codeGenerator.nullCheck, operand.resultType, operands) {
+        (terms) =>
+          unit match {
+            case YEAR | MONTH =>
+              s"""
+                |($internalType) ${qualifyMethod(temporalMethod.get)}(${terms(1)}, ${terms.head})
+                |""".stripMargin
+            case _ =>
+              s"""
+                |${qualifyMethod(arithmeticMethod)}(
+                |  ($internalType) ${terms.head},
+                |  ($internalType) ${unit.startUnit.multiplier.intValue()})
+                |""".stripMargin
+          }
+      }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6a456c67/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
index 44cb6d2..8aa632f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
@@ -26,7 +26,7 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable._
 import org.apache.calcite.sql.fun.SqlTrimFunction
 import org.apache.calcite.util.BuiltInMethod
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.GenericTypeInfo
 import org.apache.flink.api.table.functions.utils.ScalarSqlFunction
 
@@ -181,6 +181,48 @@ object ScalarFunctions {
     LONG_TYPE_INFO,
     BuiltInMethod.UNIX_DATE_EXTRACT.method)
 
+  addSqlFunction(
+    FLOOR,
+    Seq(SqlTimeTypeInfo.DATE, new GenericTypeInfo(classOf[TimeUnitRange])),
+    new FloorCeilCallGen(
+      BuiltInMethod.FLOOR.method,
+      Some(BuiltInMethod.UNIX_DATE_FLOOR.method)))
+
+  addSqlFunction(
+    FLOOR,
+    Seq(SqlTimeTypeInfo.TIME, new GenericTypeInfo(classOf[TimeUnitRange])),
+    new FloorCeilCallGen(
+      BuiltInMethod.FLOOR.method,
+      Some(BuiltInMethod.UNIX_DATE_FLOOR.method)))
+
+  addSqlFunction(
+    FLOOR,
+    Seq(SqlTimeTypeInfo.TIMESTAMP, new GenericTypeInfo(classOf[TimeUnitRange])),
+    new FloorCeilCallGen(
+      BuiltInMethod.FLOOR.method,
+      Some(BuiltInMethod.UNIX_TIMESTAMP_FLOOR.method)))
+
+  addSqlFunction(
+    CEIL,
+    Seq(SqlTimeTypeInfo.DATE, new GenericTypeInfo(classOf[TimeUnitRange])),
+    new FloorCeilCallGen(
+      BuiltInMethod.CEIL.method,
+      Some(BuiltInMethod.UNIX_DATE_CEIL.method)))
+
+  addSqlFunction(
+    CEIL,
+    Seq(SqlTimeTypeInfo.TIME, new GenericTypeInfo(classOf[TimeUnitRange])),
+    new FloorCeilCallGen(
+      BuiltInMethod.CEIL.method,
+      Some(BuiltInMethod.UNIX_DATE_CEIL.method)))
+
+  addSqlFunction(
+    CEIL,
+    Seq(SqlTimeTypeInfo.TIMESTAMP, new GenericTypeInfo(classOf[TimeUnitRange])),
+    new FloorCeilCallGen(
+      BuiltInMethod.CEIL.method,
+      Some(BuiltInMethod.UNIX_TIMESTAMP_CEIL.method)))
+
   // ----------------------------------------------------------------------------------------------
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/6a456c67/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
index 1dd480b..c57d43b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
@@ -26,6 +26,7 @@ import org.apache.flink.api.table.expressions.TimePointUnit.TimePointUnit
 import org.apache.flink.api.table.expressions.TrimMode.TrimMode
 import org.apache.flink.api.table.typeutils.IntervalTypeInfo
 
+import scala.language.implicitConversions
 import scala.util.parsing.combinator.{JavaTokenParsers, PackratParsers}
 
 /**
@@ -65,6 +66,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
   lazy val TO_TIMESTAMP: Keyword = Keyword("toTimestamp")
   lazy val TRIM: Keyword = Keyword("trim")
   lazy val EXTRACT: Keyword = Keyword("extract")
+  lazy val FLOOR: Keyword = Keyword("floor")
+  lazy val CEIL: Keyword = Keyword("ceil")
   lazy val YEAR: Keyword = Keyword("year")
   lazy val MONTH: Keyword = Keyword("month")
   lazy val DAY: Keyword = Keyword("day")
@@ -213,6 +216,14 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
     case operand ~ _  ~ _ ~ _ ~ unit ~ _ => Extract(unit, operand)
   }
 
+  lazy val suffixFloor = composite ~ "." ~ FLOOR ~ "(" ~ timeIntervalUnit ~ ")" ^^ {
+    case operand ~ _  ~ _ ~ _ ~ unit ~ _ => TemporalFloor(unit, operand)
+  }
+
+  lazy val suffixCeil = composite ~ "." ~ CEIL ~ "(" ~ timeIntervalUnit ~ ")" ^^ {
+    case operand ~ _  ~ _ ~ _ ~ unit ~ _ => TemporalCeil(unit, operand)
+  }
+
   lazy val suffixFunctionCall =
     composite ~ "." ~ functionIdent ~ "(" ~ repsep(expression, ",") ~ ")" ^^ {
     case operand ~ _ ~ name ~ _ ~ args ~ _ => Call(name.toUpperCase, operand :: args)
@@ -255,7 +266,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
     suffixTimeInterval | suffixIsNull | suffixIsNotNull | suffixSum | suffixMin | suffixMax |
       suffixCount | suffixAvg | suffixCast | suffixAs | suffixTrim | suffixTrimWithoutArgs |
       suffixIf | suffixAsc | suffixDesc | suffixToDate | suffixToTimestamp | suffixToTime |
-      suffixExtract | suffixFunctionCall // function call must always be at the end
+      suffixExtract | suffixFloor | suffixCeil |
+      suffixFunctionCall // function call must always be at the end
 
   // prefix operators
 
@@ -311,10 +323,18 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
     case _ ~ _ ~ operand ~ _ ~ unit ~ _ => Extract(unit, operand)
   }
 
+  lazy val prefixFloor = FLOOR ~ "(" ~ expression ~ "," ~ timeIntervalUnit ~ ")" ^^ {
+    case _ ~ _ ~ operand ~ _ ~ unit ~ _ => TemporalFloor(unit, operand)
+  }
+
+  lazy val prefixCeil = CEIL ~ "(" ~ expression ~ "," ~ timeIntervalUnit ~ ")" ^^ {
+    case _ ~ _ ~ operand ~ _ ~ unit ~ _ => TemporalCeil(unit, operand)
+  }
+
   lazy val prefixed: PackratParser[Expression] =
     prefixIsNull | prefixIsNotNull | prefixSum | prefixMin | prefixMax | prefixCount | prefixAvg |
       prefixCast | prefixAs | prefixTrim | prefixTrimWithoutArgs | prefixIf | prefixExtract |
-      prefixFunctionCall // function call must always be at the end
+      prefixFloor | prefixCeil | prefixFunctionCall // function call must always be at the end
 
   // suffix/prefix composite
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6a456c67/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala
index a10f4d0..48b512c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.api.table.FlinkRelBuilder
 import org.apache.flink.api.table.expressions.ExpressionUtils.{divide, getFactor, mod}
+import org.apache.flink.api.table.expressions.TimeIntervalUnit.TimeIntervalUnit
 import org.apache.flink.api.table.typeutils.{IntervalTypeInfo, TypeCheckUtils}
 import org.apache.flink.api.table.validate.{ExprValidationResult, ValidationFailure, ValidationSuccess}
 
@@ -63,8 +64,8 @@ case class Extract(timeIntervalUnit: Expression, temporal: Expression) extends E
         ValidationSuccess
 
       case _ =>
-        ValidationFailure(s"Extract operator does not support unit '$timeIntervalUnit' for input " +
-          s"of type '${temporal.resultType}'.")
+        ValidationFailure(s"Extract operator does not support unit '$timeIntervalUnit' for input" +
+          s" of type '${temporal.resultType}'.")
     }
   }
 
@@ -131,7 +132,72 @@ case class Extract(timeIntervalUnit: Expression, temporal: Expression) extends E
     result = divide(rexBuilder, result, unit.multiplier)
     result
   }
+}
+
+abstract class TemporalCeilFloor(
+    timeIntervalUnit: Expression,
+    temporal: Expression)
+  extends Expression {
+
+  override private[flink] def children: Seq[Expression] = timeIntervalUnit :: temporal :: Nil
+
+  override private[flink] def resultType: TypeInformation[_] = temporal.resultType
+
+  override private[flink] def validateInput(): ExprValidationResult = {
+    if (!TypeCheckUtils.isTimePoint(temporal.resultType)) {
+      return ValidationFailure(s"Temporal ceil/floor operator requires Time Point input, " +
+        s"but $temporal is of type ${temporal.resultType}")
+    }
+    val unit = timeIntervalUnit match {
+      case SymbolExpression(u: TimeIntervalUnit) => Some(u)
+      case _ => None
+    }
+    if (unit.isEmpty) {
+      return ValidationFailure(s"Temporal ceil/floor operator requires Time Interval Unit " +
+        s"input, but $timeIntervalUnit is of type ${timeIntervalUnit.resultType}")
+    }
+
+    (unit.get, temporal.resultType) match {
+      case (TimeIntervalUnit.YEAR | TimeIntervalUnit.MONTH,
+          SqlTimeTypeInfo.DATE | SqlTimeTypeInfo.TIMESTAMP) =>
+        ValidationSuccess
+      case (TimeIntervalUnit.DAY, SqlTimeTypeInfo.TIMESTAMP) =>
+        ValidationSuccess
+      case (TimeIntervalUnit.HOUR | TimeIntervalUnit.MINUTE | TimeIntervalUnit.SECOND,
+          SqlTimeTypeInfo.TIME | SqlTimeTypeInfo.TIMESTAMP) =>
+        ValidationSuccess
+      case _ =>
+        ValidationFailure(s"Temporal ceil/floor operator does not support " +
+          s"unit '$timeIntervalUnit' for input of type '${temporal.resultType}'.")
+    }
+  }
+}
+
+case class TemporalFloor(
+    timeIntervalUnit: Expression,
+    temporal: Expression)
+  extends TemporalCeilFloor(
+    timeIntervalUnit,
+    temporal) {
+
+  override def toString: String = s"($temporal).floor($timeIntervalUnit)"
 
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.FLOOR, temporal.toRexNode, timeIntervalUnit.toRexNode)
+  }
 }
 
+case class TemporalCeil(
+    timeIntervalUnit: Expression,
+    temporal: Expression)
+  extends TemporalCeilFloor(
+    timeIntervalUnit,
+    temporal) {
+
+  override def toString: String = s"($temporal).ceil($timeIntervalUnit)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.CEIL, temporal.toRexNode, timeIntervalUnit.toRexNode)
+  }
+}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6a456c67/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
index 58b1f69..fb38dde 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
@@ -146,6 +146,9 @@ object FunctionCatalog {
 
     // temporal functions
     "extract" -> classOf[Extract]
+    // TODO implement function overloading here
+    // "floor" -> classOf[TemporalFloor]
+    // "ceil" -> classOf[TemporalCeil]
   )
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/6a456c67/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
index 958bebe..7162a04 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
@@ -579,6 +579,129 @@ class ScalarFunctionsTest extends ExpressionTestBase {
       "2")
   }
 
+  @Test
+  def testTemporalFloor(): Unit = {
+    testAllApis(
+      'f18.floor(TimeIntervalUnit.YEAR),
+      "f18.floor(YEAR)",
+      "FLOOR(f18 TO YEAR)",
+      "1996-01-01 00:00:00.0")
+
+    testAllApis(
+      'f18.floor(TimeIntervalUnit.MONTH),
+      "f18.floor(MONTH)",
+      "FLOOR(f18 TO MONTH)",
+      "1996-11-01 00:00:00.0")
+
+    testAllApis(
+      'f18.floor(TimeIntervalUnit.DAY),
+      "f18.floor(DAY)",
+      "FLOOR(f18 TO DAY)",
+      "1996-11-10 00:00:00.0")
+
+    testAllApis(
+      'f18.floor(TimeIntervalUnit.MINUTE),
+      "f18.floor(MINUTE)",
+      "FLOOR(f18 TO MINUTE)",
+      "1996-11-10 06:55:00.0")
+
+    testAllApis(
+      'f18.floor(TimeIntervalUnit.SECOND),
+      "f18.floor(SECOND)",
+      "FLOOR(f18 TO SECOND)",
+      "1996-11-10 06:55:44.0")
+
+    testAllApis(
+      'f17.floor(TimeIntervalUnit.HOUR),
+      "f17.floor(HOUR)",
+      "FLOOR(f17 TO HOUR)",
+      "06:00:00")
+
+    testAllApis(
+      'f17.floor(TimeIntervalUnit.MINUTE),
+      "f17.floor(MINUTE)",
+      "FLOOR(f17 TO MINUTE)",
+      "06:55:00")
+
+    testAllApis(
+      'f17.floor(TimeIntervalUnit.SECOND),
+      "f17.floor(SECOND)",
+      "FLOOR(f17 TO SECOND)",
+      "06:55:44")
+
+    testAllApis(
+      'f16.floor(TimeIntervalUnit.YEAR),
+      "f16.floor(YEAR)",
+      "FLOOR(f16 TO YEAR)",
+      "1996-01-01")
+
+    testAllApis(
+      'f16.floor(TimeIntervalUnit.MONTH),
+      "f16.floor(MONTH)",
+      "FLOOR(f16 TO MONTH)",
+      "1996-11-01")
+
+    testAllApis(
+      'f18.ceil(TimeIntervalUnit.YEAR),
+      "f18.ceil(YEAR)",
+      "CEIL(f18 TO YEAR)",
+      "1997-01-01 00:00:00.0")
+
+    testAllApis(
+      'f18.ceil(TimeIntervalUnit.MONTH),
+      "f18.ceil(MONTH)",
+      "CEIL(f18 TO MONTH)",
+      "1996-12-01 00:00:00.0")
+
+    testAllApis(
+      'f18.ceil(TimeIntervalUnit.DAY),
+      "f18.ceil(DAY)",
+      "CEIL(f18 TO DAY)",
+      "1996-11-11 00:00:00.0")
+
+    testAllApis(
+      'f18.ceil(TimeIntervalUnit.MINUTE),
+      "f18.ceil(MINUTE)",
+      "CEIL(f18 TO MINUTE)",
+      "1996-11-10 06:56:00.0")
+
+    testAllApis(
+      'f18.ceil(TimeIntervalUnit.SECOND),
+      "f18.ceil(SECOND)",
+      "CEIL(f18 TO SECOND)",
+      "1996-11-10 06:55:45.0")
+
+    testAllApis(
+      'f17.ceil(TimeIntervalUnit.HOUR),
+      "f17.ceil(HOUR)",
+      "CEIL(f17 TO HOUR)",
+      "07:00:00")
+
+    testAllApis(
+      'f17.ceil(TimeIntervalUnit.MINUTE),
+      "f17.ceil(MINUTE)",
+      "CEIL(f17 TO MINUTE)",
+      "06:56:00")
+
+    testAllApis(
+      'f17.ceil(TimeIntervalUnit.SECOND),
+      "f17.ceil(SECOND)",
+      "CEIL(f17 TO SECOND)",
+      "06:55:44")
+
+    testAllApis(
+      'f16.ceil(TimeIntervalUnit.YEAR),
+      "f16.ceil(YEAR)",
+      "CEIL(f16 TO YEAR)",
+      "1996-01-01")
+
+    testAllApis(
+      'f16.ceil(TimeIntervalUnit.MONTH),
+      "f16.ceil(MONTH)",
+      "CEIL(f16 TO MONTH)",
+      "1996-11-01")
+  }
+
   // ----------------------------------------------------------------------------------------------
 
   def testData = {