You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/05 14:46:23 UTC
[flink] 04/05: [FLINK-13237][table-planner-blink] Add LocalTime and
LocalDateTime support to planner expressions
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit beddbd4677306a8c975042d36f9c5597c55ca9ae
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Thu Jul 25 16:29:29 2019 +0800
[FLINK-13237][table-planner-blink] Add LocalTime and LocalDateTime support to planner expressions
---
.../expressions/PlannerExpressionConverter.scala | 3 +--
.../table/planner/expressions/comparison.scala | 10 ++++++++++
.../flink/table/planner/expressions/time.scala | 21 ++++++++++++++++-----
.../planner/typeutils/TypeInfoCheckUtils.scala | 7 ++++---
4 files changed, 31 insertions(+), 10 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
index 9b9fd36..90a8282 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
@@ -28,7 +28,6 @@ import org.apache.flink.table.planner.functions.InternalFunctionDefinitions.THRO
import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
import org.apache.flink.table.types.logical.LogicalTypeRoot.{CHAR, DECIMAL, SYMBOL, TIMESTAMP_WITHOUT_TIME_ZONE}
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks._
-import org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo
import _root_.scala.collection.JavaConverters._
@@ -91,7 +90,7 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp
assert(children.size == 2)
return ThrowException(
children.head.accept(this),
- fromDataTypeToLegacyInfo(
+ fromDataTypeToTypeInfo(
children(1).asInstanceOf[TypeLiteralExpression].getOutputDataType))
case _ =>
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/comparison.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/comparison.scala
index b0684df..b8769f2 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/comparison.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/comparison.scala
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable
import org.apache.flink.table.planner.typeutils.TypeInfoCheckUtils.{isArray, isComparable, isNumeric}
import org.apache.flink.table.planner.validate._
+import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
import org.apache.calcite.sql.SqlOperator
@@ -34,6 +35,9 @@ abstract class BinaryComparison extends BinaryExpression {
(left.resultType, right.resultType) match {
case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess
case (lType, rType) if isComparable(lType) && lType == rType => ValidationSuccess
+ case (lType, rType) if isComparable(lType) &&
+ fromTypeInfoToLogicalType(lType) == fromTypeInfoToLogicalType(rType) =>
+ ValidationSuccess
case (lType, rType) =>
ValidationFailure(
s"Comparison is only supported for numeric types and " +
@@ -50,6 +54,9 @@ case class EqualTo(left: PlannerExpression, right: PlannerExpression) extends Bi
(left.resultType, right.resultType) match {
case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess
case (lType, rType) if lType == rType => ValidationSuccess
+ case (lType, rType)
+ if fromTypeInfoToLogicalType(lType) == fromTypeInfoToLogicalType(rType) =>
+ ValidationSuccess
case (lType, rType) if isArray(lType) && lType.getTypeClass == rType.getTypeClass =>
ValidationSuccess
case (lType, rType) =>
@@ -66,6 +73,9 @@ case class NotEqualTo(left: PlannerExpression, right: PlannerExpression) extends
(left.resultType, right.resultType) match {
case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess
case (lType, rType) if lType == rType => ValidationSuccess
+ case (lType, rType)
+ if fromTypeInfoToLogicalType(lType) == fromTypeInfoToLogicalType(rType) =>
+ ValidationSuccess
case (lType, rType) if isArray(lType) && lType.getTypeClass == rType.getTypeClass =>
ValidationSuccess
case (lType, rType) =>
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/time.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/time.scala
index 2429881..8817f5c 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/time.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/time.scala
@@ -19,7 +19,7 @@
package org.apache.flink.table.planner.expressions
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo.{LocalTimeTypeInfo, SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.table.planner.calcite.FlinkRelBuilder
import org.apache.flink.table.planner.expressions.PlannerTimeIntervalUnit.PlannerTimeIntervalUnit
import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable
@@ -51,6 +51,8 @@ case class Extract(timeIntervalUnit: PlannerExpression, temporal: PlannerExpress
| SymbolPlannerExpression(PlannerTimeIntervalUnit.DAY)
if temporal.resultType == SqlTimeTypeInfo.DATE
|| temporal.resultType == SqlTimeTypeInfo.TIMESTAMP
+ || temporal.resultType == LocalTimeTypeInfo.LOCAL_DATE
+ || temporal.resultType == LocalTimeTypeInfo.LOCAL_DATE_TIME
|| temporal.resultType == TimeIntervalTypeInfo.INTERVAL_MILLIS
|| temporal.resultType == TimeIntervalTypeInfo.INTERVAL_MONTHS =>
ValidationSuccess
@@ -60,6 +62,8 @@ case class Extract(timeIntervalUnit: PlannerExpression, temporal: PlannerExpress
| SymbolPlannerExpression(PlannerTimeIntervalUnit.SECOND)
if temporal.resultType == SqlTimeTypeInfo.TIME
|| temporal.resultType == SqlTimeTypeInfo.TIMESTAMP
+ || temporal.resultType == LocalTimeTypeInfo.LOCAL_TIME
+ || temporal.resultType == LocalTimeTypeInfo.LOCAL_DATE_TIME
|| temporal.resultType == TimeIntervalTypeInfo.INTERVAL_MILLIS =>
ValidationSuccess
@@ -97,12 +101,15 @@ abstract class TemporalCeilFloor(
(unit.get, temporal.resultType) match {
case (PlannerTimeIntervalUnit.YEAR | PlannerTimeIntervalUnit.MONTH,
- SqlTimeTypeInfo.DATE | SqlTimeTypeInfo.TIMESTAMP) =>
+ SqlTimeTypeInfo.DATE | SqlTimeTypeInfo.TIMESTAMP |
+ LocalTimeTypeInfo.LOCAL_DATE | LocalTimeTypeInfo.LOCAL_DATE_TIME) =>
ValidationSuccess
- case (PlannerTimeIntervalUnit.DAY, SqlTimeTypeInfo.TIMESTAMP) =>
+ case (PlannerTimeIntervalUnit.DAY, SqlTimeTypeInfo.TIMESTAMP |
+ LocalTimeTypeInfo.LOCAL_DATE_TIME) =>
ValidationSuccess
case (PlannerTimeIntervalUnit.HOUR | PlannerTimeIntervalUnit.MINUTE |
- PlannerTimeIntervalUnit.SECOND, SqlTimeTypeInfo.TIME | SqlTimeTypeInfo.TIMESTAMP) =>
+ PlannerTimeIntervalUnit.SECOND, SqlTimeTypeInfo.TIME | SqlTimeTypeInfo.TIMESTAMP |
+ LocalTimeTypeInfo.LOCAL_TIME | LocalTimeTypeInfo.LOCAL_DATE_TIME) =>
ValidationSuccess
case _ =>
ValidationFailure(s"Temporal ceil/floor operator does not support " +
@@ -308,7 +315,11 @@ case class TimestampDiff(
if timePoint1.resultType == SqlTimeTypeInfo.DATE
|| timePoint1.resultType == SqlTimeTypeInfo.TIMESTAMP
|| timePoint2.resultType == SqlTimeTypeInfo.DATE
- || timePoint2.resultType == SqlTimeTypeInfo.TIMESTAMP =>
+ || timePoint2.resultType == SqlTimeTypeInfo.TIMESTAMP
+ || timePoint1.resultType == LocalTimeTypeInfo.LOCAL_DATE
+ || timePoint1.resultType == LocalTimeTypeInfo.LOCAL_DATE_TIME
+ || timePoint2.resultType == LocalTimeTypeInfo.LOCAL_DATE
+ || timePoint2.resultType == LocalTimeTypeInfo.LOCAL_DATE_TIME =>
ValidationSuccess
case _ =>
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/typeutils/TypeInfoCheckUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/typeutils/TypeInfoCheckUtils.scala
index 80f4a57..33108dd 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/typeutils/TypeInfoCheckUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/typeutils/TypeInfoCheckUtils.scala
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo, PojoTypeInfo}
import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.planner.validate._
+import org.apache.flink.table.runtime.typeutils.{BigDecimalTypeInfo, DecimalTypeInfo}
import org.apache.flink.table.typeutils.TimeIntervalTypeInfo.{INTERVAL_MILLIS, INTERVAL_MONTHS}
import org.apache.flink.table.typeutils.{TimeIndicatorTypeInfo, TimeIntervalTypeInfo}
@@ -48,7 +49,7 @@ object TypeInfoCheckUtils {
def isNumeric(dataType: TypeInformation[_]): Boolean = dataType match {
case _: NumericTypeInfo[_] => true
- case BIG_DEC_TYPE_INFO => true
+ case BIG_DEC_TYPE_INFO | _: BigDecimalTypeInfo | _: DecimalTypeInfo => true
case _ => false
}
@@ -56,7 +57,7 @@ object TypeInfoCheckUtils {
isTimePoint(dataType) || isTimeInterval(dataType)
def isTimePoint(dataType: TypeInformation[_]): Boolean =
- dataType.isInstanceOf[SqlTimeTypeInfo[_]]
+ dataType.isInstanceOf[SqlTimeTypeInfo[_]] || dataType.isInstanceOf[LocalTimeTypeInfo[_]]
def isTimeInterval(dataType: TypeInformation[_]): Boolean =
dataType.isInstanceOf[TimeIntervalTypeInfo[_]]
@@ -103,7 +104,7 @@ object TypeInfoCheckUtils {
: ValidationResult = dataType match {
case _: NumericTypeInfo[_] =>
ValidationSuccess
- case BIG_DEC_TYPE_INFO =>
+ case BIG_DEC_TYPE_INFO | _: BigDecimalTypeInfo | _: DecimalTypeInfo =>
ValidationSuccess
case _ =>
ValidationFailure(s"$caller requires numeric types, get $dataType here")