You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/08/26 14:21:53 UTC
flink git commit: [FLINK-3580] [table] Implement FLOOR/CEIL for time
points
Repository: flink
Updated Branches:
refs/heads/master b05ea6939 -> 6a456c673
[FLINK-3580] [table] Implement FLOOR/CEIL for time points
This closes #2391.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6a456c67
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6a456c67
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6a456c67
Branch: refs/heads/master
Commit: 6a456c67316a5e8ad3256e4cbfe37397b0c87282
Parents: b05ea69
Author: twalthr <tw...@apache.org>
Authored: Fri Aug 19 12:18:49 2016 +0200
Committer: twalthr <tw...@apache.org>
Committed: Fri Aug 26 16:19:59 2016 +0200
----------------------------------------------------------------------
docs/dev/table_api.md | 66 ++++++++++
.../flink/api/scala/table/expressionDsl.scala | 20 ++-
.../flink/api/table/codegen/CodeGenUtils.scala | 13 +-
.../table/codegen/calls/FloorCeilCallGen.scala | 53 ++++++--
.../table/codegen/calls/ScalarFunctions.scala | 44 ++++++-
.../table/expressions/ExpressionParser.scala | 24 +++-
.../flink/api/table/expressions/time.scala | 70 ++++++++++-
.../api/table/validate/FunctionCatalog.scala | 3 +
.../table/expressions/ScalarFunctionsTest.scala | 123 +++++++++++++++++++
9 files changed, 396 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6a456c67/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index cdd3667..7a20e6a 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -1442,6 +1442,28 @@ TEMPORAL.extract(TIMEINTERVALUNIT)
</td>
</tr>
+ <tr>
+ <td>
+ {% highlight java %}
+TIMEPOINT.floor(TIMEINTERVALUNIT)
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Rounds a time point down to the given unit. E.g. <code>"12:44:31".toDate.floor(MINUTE)</code> leads to 12:44:00.</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ {% highlight java %}
+TIMEPOINT.ceil(TIMEINTERVALUNIT)
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Rounds a time point up to the given unit. E.g. <code>"12:44:31".toTime.floor(MINUTE)</code> leads to 12:45:00.</p>
+ </td>
+ </tr>
+
</tbody>
</table>
@@ -1683,6 +1705,28 @@ TEMPORAL.extract(TimeIntervalUnit)
</td>
</tr>
+ <tr>
+ <td>
+ {% highlight scala %}
+TIMEPOINT.floor(TimeIntervalUnit)
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Rounds a time point down to the given unit. E.g. <code>"12:44:31".toTime.floor(TimeIntervalUnit.MINUTE)</code> leads to 12:44:00.</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ {% highlight scala %}
+TIMEPOINT.ceil(TimeIntervalUnit)
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Rounds a time point up to the given unit. E.g. <code>"12:44:31".toTime.floor(TimeIntervalUnit.MINUTE)</code> leads to 12:45:00.</p>
+ </td>
+ </tr>
+
</tbody>
</table>
</div>
@@ -1926,6 +1970,28 @@ EXTRACT(TIMEINTERVALUNIT FROM TEMPORAL)
</td>
</tr>
+ <tr>
+ <td>
+ {% highlight sql %}
+FLOOR(TIMEPOINT TO TIMEINTERVALUNIT)
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Rounds a time point down to the given unit. E.g. <code>FLOOR(TIME '12:44:31' TO MINUTE)</code> leads to 12:44:00.</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ {% highlight sql %}
+CEIL(TIMEPOINT TO TIMEINTERVALUNIT)
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Rounds a time point up to the given unit. E.g. <code>CEIL(TIME '12:44:31' TO MINUTE)</code> leads to 12:45:00.</p>
+ </td>
+ </tr>
+
</tbody>
</table>
</div>
http://git-wip-us.apache.org/repos/asf/flink/blob/6a456c67/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
index 92c61a3..b14ca88 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
@@ -137,6 +137,8 @@ trait ImplicitExpressionOperations {
*/
def ceil() = Ceil(expr)
+ // String operations
+
/**
* Creates a substring of the given string at given index for a given length.
*
@@ -216,6 +218,8 @@ trait ImplicitExpressionOperations {
*/
def similar(pattern: Expression) = Similar(expr, pattern)
+ // Temporal operations
+
/**
* Parses a date String in the form "yy-mm-dd" to a SQL Date.
*/
@@ -238,7 +242,21 @@ trait ImplicitExpressionOperations {
*/
def extract(timeIntervalUnit: TimeIntervalUnit) = Extract(timeIntervalUnit, expr)
- // interval types
+ /**
+ * Rounds down a time point to the given unit.
+ *
+ * e.g. "12:44:31".toDate.floor(MINUTE) leads to 12:44:00
+ */
+ def floor(timeIntervalUnit: TimeIntervalUnit) = TemporalFloor(timeIntervalUnit, expr)
+
+ /**
+ * Rounds up a time point to the given unit.
+ *
+ * e.g. "12:44:31".toDate.ceil(MINUTE) leads to 12:45:00
+ */
+ def ceil(timeIntervalUnit: TimeIntervalUnit) = TemporalCeil(timeIntervalUnit, expr)
+
+ // Interval types
/**
* Creates an interval of the given number of years.
http://git-wip-us.apache.org/repos/asf/flink/blob/6a456c67/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
index 170af54..76f9b02 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
@@ -143,8 +143,17 @@ object CodeGenUtils {
s"${qualifyMethod(BuiltInMethod.TIMESTAMP_TO_LONG.method)}($resultTerm)"
}
- def compareEnum(term: String, enum: Enum[_]): Boolean =
- term == qualifyEnum(enum)
+ def compareEnum(term: String, enum: Enum[_]): Boolean = term == qualifyEnum(enum)
+
+ def getEnum(genExpr: GeneratedExpression): Enum[_] = {
+ val split = genExpr.resultTerm.split('.')
+ val value = split.last
+ val clazz = genExpr.resultType.getTypeClass
+ enumValueOf(clazz, value)
+ }
+
+ def enumValueOf[T <: Enum[T]](cls: Class[_], stringValue: String): Enum[_] =
+ Enum.valueOf(cls.asInstanceOf[Class[T]], stringValue).asInstanceOf[Enum[_]]
// ----------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6a456c67/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/FloorCeilCallGen.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/FloorCeilCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/FloorCeilCallGen.scala
index 84f60a0..d41e9a7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/FloorCeilCallGen.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/FloorCeilCallGen.scala
@@ -20,25 +20,54 @@ package org.apache.flink.api.table.codegen.calls
import java.lang.reflect.Method
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo.
- {DOUBLE_TYPE_INFO, FLOAT_TYPE_INFO,BIG_DEC_TYPE_INFO}
+import org.apache.calcite.avatica.util.TimeUnitRange
+import org.apache.calcite.avatica.util.TimeUnitRange.{MONTH, YEAR}
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{BIG_DEC_TYPE_INFO, DOUBLE_TYPE_INFO, FLOAT_TYPE_INFO}
+import org.apache.flink.api.table.codegen.CodeGenUtils.{getEnum, primitiveTypeTermForTypeInfo, qualifyMethod}
+import org.apache.flink.api.table.codegen.calls.CallGenerator.generateCallIfArgsNotNull
import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedExpression}
/**
- * Generates arithmetic floor/ceil function calls.
+ * Generates floor/ceil function calls.
*/
-class FloorCeilCallGen(method: Method) extends MultiTypeMethodCallGen(method) {
+class FloorCeilCallGen(
+ arithmeticMethod: Method,
+ temporalMethod: Option[Method] = None)
+ extends MultiTypeMethodCallGen(arithmeticMethod) {
override def generate(
codeGenerator: CodeGenerator,
operands: Seq[GeneratedExpression])
- : GeneratedExpression = {
- operands.head.resultType match {
- case FLOAT_TYPE_INFO | DOUBLE_TYPE_INFO | BIG_DEC_TYPE_INFO =>
- super.generate(codeGenerator, operands)
- case _ =>
- operands.head // no floor/ceil necessary
- }
- }
+ : GeneratedExpression = operands.size match {
+ // arithmetic
+ case 1 =>
+ operands.head.resultType match {
+ case FLOAT_TYPE_INFO | DOUBLE_TYPE_INFO | BIG_DEC_TYPE_INFO =>
+ super.generate(codeGenerator, operands)
+ case _ =>
+ operands.head // no floor/ceil necessary
+ }
+
+ // temporal
+ case 2 =>
+ val operand = operands.head
+ val unit = getEnum(operands(1)).asInstanceOf[TimeUnitRange]
+ val internalType = primitiveTypeTermForTypeInfo(operand.resultType)
+ generateCallIfArgsNotNull(codeGenerator.nullCheck, operand.resultType, operands) {
+ (terms) =>
+ unit match {
+ case YEAR | MONTH =>
+ s"""
+ |($internalType) ${qualifyMethod(temporalMethod.get)}(${terms(1)}, ${terms.head})
+ |""".stripMargin
+ case _ =>
+ s"""
+ |${qualifyMethod(arithmeticMethod)}(
+ | ($internalType) ${terms.head},
+ | ($internalType) ${unit.startUnit.multiplier.intValue()})
+ |""".stripMargin
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6a456c67/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
index 44cb6d2..8aa632f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
@@ -26,7 +26,7 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable._
import org.apache.calcite.sql.fun.SqlTrimFunction
import org.apache.calcite.util.BuiltInMethod
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.GenericTypeInfo
import org.apache.flink.api.table.functions.utils.ScalarSqlFunction
@@ -181,6 +181,48 @@ object ScalarFunctions {
LONG_TYPE_INFO,
BuiltInMethod.UNIX_DATE_EXTRACT.method)
+ addSqlFunction(
+ FLOOR,
+ Seq(SqlTimeTypeInfo.DATE, new GenericTypeInfo(classOf[TimeUnitRange])),
+ new FloorCeilCallGen(
+ BuiltInMethod.FLOOR.method,
+ Some(BuiltInMethod.UNIX_DATE_FLOOR.method)))
+
+ addSqlFunction(
+ FLOOR,
+ Seq(SqlTimeTypeInfo.TIME, new GenericTypeInfo(classOf[TimeUnitRange])),
+ new FloorCeilCallGen(
+ BuiltInMethod.FLOOR.method,
+ Some(BuiltInMethod.UNIX_DATE_FLOOR.method)))
+
+ addSqlFunction(
+ FLOOR,
+ Seq(SqlTimeTypeInfo.TIMESTAMP, new GenericTypeInfo(classOf[TimeUnitRange])),
+ new FloorCeilCallGen(
+ BuiltInMethod.FLOOR.method,
+ Some(BuiltInMethod.UNIX_TIMESTAMP_FLOOR.method)))
+
+ addSqlFunction(
+ CEIL,
+ Seq(SqlTimeTypeInfo.DATE, new GenericTypeInfo(classOf[TimeUnitRange])),
+ new FloorCeilCallGen(
+ BuiltInMethod.CEIL.method,
+ Some(BuiltInMethod.UNIX_DATE_CEIL.method)))
+
+ addSqlFunction(
+ CEIL,
+ Seq(SqlTimeTypeInfo.TIME, new GenericTypeInfo(classOf[TimeUnitRange])),
+ new FloorCeilCallGen(
+ BuiltInMethod.CEIL.method,
+ Some(BuiltInMethod.UNIX_DATE_CEIL.method)))
+
+ addSqlFunction(
+ CEIL,
+ Seq(SqlTimeTypeInfo.TIMESTAMP, new GenericTypeInfo(classOf[TimeUnitRange])),
+ new FloorCeilCallGen(
+ BuiltInMethod.CEIL.method,
+ Some(BuiltInMethod.UNIX_TIMESTAMP_CEIL.method)))
+
// ----------------------------------------------------------------------------------------------
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/6a456c67/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
index 1dd480b..c57d43b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
@@ -26,6 +26,7 @@ import org.apache.flink.api.table.expressions.TimePointUnit.TimePointUnit
import org.apache.flink.api.table.expressions.TrimMode.TrimMode
import org.apache.flink.api.table.typeutils.IntervalTypeInfo
+import scala.language.implicitConversions
import scala.util.parsing.combinator.{JavaTokenParsers, PackratParsers}
/**
@@ -65,6 +66,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
lazy val TO_TIMESTAMP: Keyword = Keyword("toTimestamp")
lazy val TRIM: Keyword = Keyword("trim")
lazy val EXTRACT: Keyword = Keyword("extract")
+ lazy val FLOOR: Keyword = Keyword("floor")
+ lazy val CEIL: Keyword = Keyword("ceil")
lazy val YEAR: Keyword = Keyword("year")
lazy val MONTH: Keyword = Keyword("month")
lazy val DAY: Keyword = Keyword("day")
@@ -213,6 +216,14 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
case operand ~ _ ~ _ ~ _ ~ unit ~ _ => Extract(unit, operand)
}
+ lazy val suffixFloor = composite ~ "." ~ FLOOR ~ "(" ~ timeIntervalUnit ~ ")" ^^ {
+ case operand ~ _ ~ _ ~ _ ~ unit ~ _ => TemporalFloor(unit, operand)
+ }
+
+ lazy val suffixCeil = composite ~ "." ~ CEIL ~ "(" ~ timeIntervalUnit ~ ")" ^^ {
+ case operand ~ _ ~ _ ~ _ ~ unit ~ _ => TemporalCeil(unit, operand)
+ }
+
lazy val suffixFunctionCall =
composite ~ "." ~ functionIdent ~ "(" ~ repsep(expression, ",") ~ ")" ^^ {
case operand ~ _ ~ name ~ _ ~ args ~ _ => Call(name.toUpperCase, operand :: args)
@@ -255,7 +266,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
suffixTimeInterval | suffixIsNull | suffixIsNotNull | suffixSum | suffixMin | suffixMax |
suffixCount | suffixAvg | suffixCast | suffixAs | suffixTrim | suffixTrimWithoutArgs |
suffixIf | suffixAsc | suffixDesc | suffixToDate | suffixToTimestamp | suffixToTime |
- suffixExtract | suffixFunctionCall // function call must always be at the end
+ suffixExtract | suffixFloor | suffixCeil |
+ suffixFunctionCall // function call must always be at the end
// prefix operators
@@ -311,10 +323,18 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
case _ ~ _ ~ operand ~ _ ~ unit ~ _ => Extract(unit, operand)
}
+ lazy val prefixFloor = FLOOR ~ "(" ~ expression ~ "," ~ timeIntervalUnit ~ ")" ^^ {
+ case _ ~ _ ~ operand ~ _ ~ unit ~ _ => TemporalFloor(unit, operand)
+ }
+
+ lazy val prefixCeil = CEIL ~ "(" ~ expression ~ "," ~ timeIntervalUnit ~ ")" ^^ {
+ case _ ~ _ ~ operand ~ _ ~ unit ~ _ => TemporalCeil(unit, operand)
+ }
+
lazy val prefixed: PackratParser[Expression] =
prefixIsNull | prefixIsNotNull | prefixSum | prefixMin | prefixMax | prefixCount | prefixAvg |
prefixCast | prefixAs | prefixTrim | prefixTrimWithoutArgs | prefixIf | prefixExtract |
- prefixFunctionCall // function call must always be at the end
+ prefixFloor | prefixCeil | prefixFunctionCall // function call must always be at the end
// suffix/prefix composite
http://git-wip-us.apache.org/repos/asf/flink/blob/6a456c67/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala
index a10f4d0..48b512c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.api.table.FlinkRelBuilder
import org.apache.flink.api.table.expressions.ExpressionUtils.{divide, getFactor, mod}
+import org.apache.flink.api.table.expressions.TimeIntervalUnit.TimeIntervalUnit
import org.apache.flink.api.table.typeutils.{IntervalTypeInfo, TypeCheckUtils}
import org.apache.flink.api.table.validate.{ExprValidationResult, ValidationFailure, ValidationSuccess}
@@ -63,8 +64,8 @@ case class Extract(timeIntervalUnit: Expression, temporal: Expression) extends E
ValidationSuccess
case _ =>
- ValidationFailure(s"Extract operator does not support unit '$timeIntervalUnit' for input " +
- s"of type '${temporal.resultType}'.")
+ ValidationFailure(s"Extract operator does not support unit '$timeIntervalUnit' for input" +
+ s" of type '${temporal.resultType}'.")
}
}
@@ -131,7 +132,72 @@ case class Extract(timeIntervalUnit: Expression, temporal: Expression) extends E
result = divide(rexBuilder, result, unit.multiplier)
result
}
+}
+
+abstract class TemporalCeilFloor(
+ timeIntervalUnit: Expression,
+ temporal: Expression)
+ extends Expression {
+
+ override private[flink] def children: Seq[Expression] = timeIntervalUnit :: temporal :: Nil
+
+ override private[flink] def resultType: TypeInformation[_] = temporal.resultType
+
+ override private[flink] def validateInput(): ExprValidationResult = {
+ if (!TypeCheckUtils.isTimePoint(temporal.resultType)) {
+ return ValidationFailure(s"Temporal ceil/floor operator requires Time Point input, " +
+ s"but $temporal is of type ${temporal.resultType}")
+ }
+ val unit = timeIntervalUnit match {
+ case SymbolExpression(u: TimeIntervalUnit) => Some(u)
+ case _ => None
+ }
+ if (unit.isEmpty) {
+ return ValidationFailure(s"Temporal ceil/floor operator requires Time Interval Unit " +
+ s"input, but $timeIntervalUnit is of type ${timeIntervalUnit.resultType}")
+ }
+
+ (unit.get, temporal.resultType) match {
+ case (TimeIntervalUnit.YEAR | TimeIntervalUnit.MONTH,
+ SqlTimeTypeInfo.DATE | SqlTimeTypeInfo.TIMESTAMP) =>
+ ValidationSuccess
+ case (TimeIntervalUnit.DAY, SqlTimeTypeInfo.TIMESTAMP) =>
+ ValidationSuccess
+ case (TimeIntervalUnit.HOUR | TimeIntervalUnit.MINUTE | TimeIntervalUnit.SECOND,
+ SqlTimeTypeInfo.TIME | SqlTimeTypeInfo.TIMESTAMP) =>
+ ValidationSuccess
+ case _ =>
+ ValidationFailure(s"Temporal ceil/floor operator does not support " +
+ s"unit '$timeIntervalUnit' for input of type '${temporal.resultType}'.")
+ }
+ }
+}
+
+case class TemporalFloor(
+ timeIntervalUnit: Expression,
+ temporal: Expression)
+ extends TemporalCeilFloor(
+ timeIntervalUnit,
+ temporal) {
+
+ override def toString: String = s"($temporal).floor($timeIntervalUnit)"
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(SqlStdOperatorTable.FLOOR, temporal.toRexNode, timeIntervalUnit.toRexNode)
+ }
}
+case class TemporalCeil(
+ timeIntervalUnit: Expression,
+ temporal: Expression)
+ extends TemporalCeilFloor(
+ timeIntervalUnit,
+ temporal) {
+
+ override def toString: String = s"($temporal).ceil($timeIntervalUnit)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(SqlStdOperatorTable.CEIL, temporal.toRexNode, timeIntervalUnit.toRexNode)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6a456c67/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
index 58b1f69..fb38dde 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
@@ -146,6 +146,9 @@ object FunctionCatalog {
// temporal functions
"extract" -> classOf[Extract]
+ // TODO implement function overloading here
+ // "floor" -> classOf[TemporalFloor]
+ // "ceil" -> classOf[TemporalCeil]
)
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/6a456c67/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
index 958bebe..7162a04 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
@@ -579,6 +579,129 @@ class ScalarFunctionsTest extends ExpressionTestBase {
"2")
}
+ @Test
+ def testTemporalFloor(): Unit = {
+ testAllApis(
+ 'f18.floor(TimeIntervalUnit.YEAR),
+ "f18.floor(YEAR)",
+ "FLOOR(f18 TO YEAR)",
+ "1996-01-01 00:00:00.0")
+
+ testAllApis(
+ 'f18.floor(TimeIntervalUnit.MONTH),
+ "f18.floor(MONTH)",
+ "FLOOR(f18 TO MONTH)",
+ "1996-11-01 00:00:00.0")
+
+ testAllApis(
+ 'f18.floor(TimeIntervalUnit.DAY),
+ "f18.floor(DAY)",
+ "FLOOR(f18 TO DAY)",
+ "1996-11-10 00:00:00.0")
+
+ testAllApis(
+ 'f18.floor(TimeIntervalUnit.MINUTE),
+ "f18.floor(MINUTE)",
+ "FLOOR(f18 TO MINUTE)",
+ "1996-11-10 06:55:00.0")
+
+ testAllApis(
+ 'f18.floor(TimeIntervalUnit.SECOND),
+ "f18.floor(SECOND)",
+ "FLOOR(f18 TO SECOND)",
+ "1996-11-10 06:55:44.0")
+
+ testAllApis(
+ 'f17.floor(TimeIntervalUnit.HOUR),
+ "f17.floor(HOUR)",
+ "FLOOR(f17 TO HOUR)",
+ "06:00:00")
+
+ testAllApis(
+ 'f17.floor(TimeIntervalUnit.MINUTE),
+ "f17.floor(MINUTE)",
+ "FLOOR(f17 TO MINUTE)",
+ "06:55:00")
+
+ testAllApis(
+ 'f17.floor(TimeIntervalUnit.SECOND),
+ "f17.floor(SECOND)",
+ "FLOOR(f17 TO SECOND)",
+ "06:55:44")
+
+ testAllApis(
+ 'f16.floor(TimeIntervalUnit.YEAR),
+ "f16.floor(YEAR)",
+ "FLOOR(f16 TO YEAR)",
+ "1996-01-01")
+
+ testAllApis(
+ 'f16.floor(TimeIntervalUnit.MONTH),
+ "f16.floor(MONTH)",
+ "FLOOR(f16 TO MONTH)",
+ "1996-11-01")
+
+ testAllApis(
+ 'f18.ceil(TimeIntervalUnit.YEAR),
+ "f18.ceil(YEAR)",
+ "CEIL(f18 TO YEAR)",
+ "1997-01-01 00:00:00.0")
+
+ testAllApis(
+ 'f18.ceil(TimeIntervalUnit.MONTH),
+ "f18.ceil(MONTH)",
+ "CEIL(f18 TO MONTH)",
+ "1996-12-01 00:00:00.0")
+
+ testAllApis(
+ 'f18.ceil(TimeIntervalUnit.DAY),
+ "f18.ceil(DAY)",
+ "CEIL(f18 TO DAY)",
+ "1996-11-11 00:00:00.0")
+
+ testAllApis(
+ 'f18.ceil(TimeIntervalUnit.MINUTE),
+ "f18.ceil(MINUTE)",
+ "CEIL(f18 TO MINUTE)",
+ "1996-11-10 06:56:00.0")
+
+ testAllApis(
+ 'f18.ceil(TimeIntervalUnit.SECOND),
+ "f18.ceil(SECOND)",
+ "CEIL(f18 TO SECOND)",
+ "1996-11-10 06:55:45.0")
+
+ testAllApis(
+ 'f17.ceil(TimeIntervalUnit.HOUR),
+ "f17.ceil(HOUR)",
+ "CEIL(f17 TO HOUR)",
+ "07:00:00")
+
+ testAllApis(
+ 'f17.ceil(TimeIntervalUnit.MINUTE),
+ "f17.ceil(MINUTE)",
+ "CEIL(f17 TO MINUTE)",
+ "06:56:00")
+
+ testAllApis(
+ 'f17.ceil(TimeIntervalUnit.SECOND),
+ "f17.ceil(SECOND)",
+ "CEIL(f17 TO SECOND)",
+ "06:55:44")
+
+ testAllApis(
+ 'f16.ceil(TimeIntervalUnit.YEAR),
+ "f16.ceil(YEAR)",
+ "CEIL(f16 TO YEAR)",
+ "1996-01-01")
+
+ testAllApis(
+ 'f16.ceil(TimeIntervalUnit.MONTH),
+ "f16.ceil(MONTH)",
+ "CEIL(f16 TO MONTH)",
+ "1996-11-01")
+ }
+
// ----------------------------------------------------------------------------------------------
def testData = {