You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/05 14:46:23 UTC

[flink] 04/05: [FLINK-13237][table-planner-blink] Add LocalTime and LocalDateTime support to planner expressions

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit beddbd4677306a8c975042d36f9c5597c55ca9ae
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Thu Jul 25 16:29:29 2019 +0800

    [FLINK-13237][table-planner-blink] Add LocalTime and LocalDateTime support to planner expressions
---
 .../expressions/PlannerExpressionConverter.scala    |  3 +--
 .../table/planner/expressions/comparison.scala      | 10 ++++++++++
 .../flink/table/planner/expressions/time.scala      | 21 ++++++++++++++++-----
 .../planner/typeutils/TypeInfoCheckUtils.scala      |  7 ++++---
 4 files changed, 31 insertions(+), 10 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
index 9b9fd36..90a8282 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
@@ -28,7 +28,6 @@ import org.apache.flink.table.planner.functions.InternalFunctionDefinitions.THRO
 import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
 import org.apache.flink.table.types.logical.LogicalTypeRoot.{CHAR, DECIMAL, SYMBOL, TIMESTAMP_WITHOUT_TIME_ZONE}
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks._
-import org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo
 
 import _root_.scala.collection.JavaConverters._
 
@@ -91,7 +90,7 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp
         assert(children.size == 2)
         return ThrowException(
           children.head.accept(this),
-          fromDataTypeToLegacyInfo(
+          fromDataTypeToTypeInfo(
             children(1).asInstanceOf[TypeLiteralExpression].getOutputDataType))
 
       case _ =>
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/comparison.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/comparison.scala
index b0684df..b8769f2 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/comparison.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/comparison.scala
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable
 import org.apache.flink.table.planner.typeutils.TypeInfoCheckUtils.{isArray, isComparable, isNumeric}
 import org.apache.flink.table.planner.validate._
+import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
 
 import org.apache.calcite.sql.SqlOperator
 
@@ -34,6 +35,9 @@ abstract class BinaryComparison extends BinaryExpression {
     (left.resultType, right.resultType) match {
       case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess
       case (lType, rType) if isComparable(lType) && lType == rType => ValidationSuccess
+      case (lType, rType) if isComparable(lType) &&
+          fromTypeInfoToLogicalType(lType) == fromTypeInfoToLogicalType(rType) =>
+        ValidationSuccess
       case (lType, rType) =>
         ValidationFailure(
           s"Comparison is only supported for numeric types and " +
@@ -50,6 +54,9 @@ case class EqualTo(left: PlannerExpression, right: PlannerExpression) extends Bi
     (left.resultType, right.resultType) match {
       case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess
       case (lType, rType) if lType == rType => ValidationSuccess
+      case (lType, rType)
+        if fromTypeInfoToLogicalType(lType) == fromTypeInfoToLogicalType(rType) =>
+        ValidationSuccess
       case (lType, rType) if isArray(lType) && lType.getTypeClass == rType.getTypeClass =>
         ValidationSuccess
       case (lType, rType) =>
@@ -66,6 +73,9 @@ case class NotEqualTo(left: PlannerExpression, right: PlannerExpression) extends
     (left.resultType, right.resultType) match {
       case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess
       case (lType, rType) if lType == rType => ValidationSuccess
+      case (lType, rType)
+        if fromTypeInfoToLogicalType(lType) == fromTypeInfoToLogicalType(rType) =>
+        ValidationSuccess
       case (lType, rType) if isArray(lType) && lType.getTypeClass == rType.getTypeClass =>
         ValidationSuccess
       case (lType, rType) =>
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/time.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/time.scala
index 2429881..8817f5c 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/time.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/time.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.planner.expressions
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo.{LocalTimeTypeInfo, SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.table.planner.calcite.FlinkRelBuilder
 import org.apache.flink.table.planner.expressions.PlannerTimeIntervalUnit.PlannerTimeIntervalUnit
 import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable
@@ -51,6 +51,8 @@ case class Extract(timeIntervalUnit: PlannerExpression, temporal: PlannerExpress
            | SymbolPlannerExpression(PlannerTimeIntervalUnit.DAY)
         if temporal.resultType == SqlTimeTypeInfo.DATE
           || temporal.resultType == SqlTimeTypeInfo.TIMESTAMP
+          || temporal.resultType == LocalTimeTypeInfo.LOCAL_DATE
+          || temporal.resultType == LocalTimeTypeInfo.LOCAL_DATE_TIME
           || temporal.resultType == TimeIntervalTypeInfo.INTERVAL_MILLIS
           || temporal.resultType == TimeIntervalTypeInfo.INTERVAL_MONTHS =>
         ValidationSuccess
@@ -60,6 +62,8 @@ case class Extract(timeIntervalUnit: PlannerExpression, temporal: PlannerExpress
            | SymbolPlannerExpression(PlannerTimeIntervalUnit.SECOND)
         if temporal.resultType == SqlTimeTypeInfo.TIME
           || temporal.resultType == SqlTimeTypeInfo.TIMESTAMP
+          || temporal.resultType == LocalTimeTypeInfo.LOCAL_TIME
+          || temporal.resultType == LocalTimeTypeInfo.LOCAL_DATE_TIME
           || temporal.resultType == TimeIntervalTypeInfo.INTERVAL_MILLIS =>
         ValidationSuccess
 
@@ -97,12 +101,15 @@ abstract class TemporalCeilFloor(
 
     (unit.get, temporal.resultType) match {
       case (PlannerTimeIntervalUnit.YEAR | PlannerTimeIntervalUnit.MONTH,
-          SqlTimeTypeInfo.DATE | SqlTimeTypeInfo.TIMESTAMP) =>
+          SqlTimeTypeInfo.DATE | SqlTimeTypeInfo.TIMESTAMP |
+          LocalTimeTypeInfo.LOCAL_DATE | LocalTimeTypeInfo.LOCAL_DATE_TIME) =>
         ValidationSuccess
-      case (PlannerTimeIntervalUnit.DAY, SqlTimeTypeInfo.TIMESTAMP) =>
+      case (PlannerTimeIntervalUnit.DAY, SqlTimeTypeInfo.TIMESTAMP |
+            LocalTimeTypeInfo.LOCAL_DATE_TIME) =>
         ValidationSuccess
       case (PlannerTimeIntervalUnit.HOUR | PlannerTimeIntervalUnit.MINUTE |
-          PlannerTimeIntervalUnit.SECOND, SqlTimeTypeInfo.TIME | SqlTimeTypeInfo.TIMESTAMP) =>
+          PlannerTimeIntervalUnit.SECOND, SqlTimeTypeInfo.TIME | SqlTimeTypeInfo.TIMESTAMP |
+          LocalTimeTypeInfo.LOCAL_TIME | LocalTimeTypeInfo.LOCAL_DATE_TIME) =>
         ValidationSuccess
       case _ =>
         ValidationFailure(s"Temporal ceil/floor operator does not support " +
@@ -308,7 +315,11 @@ case class TimestampDiff(
         if timePoint1.resultType == SqlTimeTypeInfo.DATE
           || timePoint1.resultType == SqlTimeTypeInfo.TIMESTAMP
           || timePoint2.resultType == SqlTimeTypeInfo.DATE
-          || timePoint2.resultType == SqlTimeTypeInfo.TIMESTAMP =>
+          || timePoint2.resultType == SqlTimeTypeInfo.TIMESTAMP
+          || timePoint1.resultType == LocalTimeTypeInfo.LOCAL_DATE
+          || timePoint1.resultType == LocalTimeTypeInfo.LOCAL_DATE_TIME
+          || timePoint2.resultType == LocalTimeTypeInfo.LOCAL_DATE
+          || timePoint2.resultType == LocalTimeTypeInfo.LOCAL_DATE_TIME =>
         ValidationSuccess
 
       case _ =>
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/typeutils/TypeInfoCheckUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/typeutils/TypeInfoCheckUtils.scala
index 80f4a57..33108dd 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/typeutils/TypeInfoCheckUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/typeutils/TypeInfoCheckUtils.scala
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo, PojoTypeInfo}
 import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.planner.validate._
+import org.apache.flink.table.runtime.typeutils.{BigDecimalTypeInfo, DecimalTypeInfo}
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo.{INTERVAL_MILLIS, INTERVAL_MONTHS}
 import org.apache.flink.table.typeutils.{TimeIndicatorTypeInfo, TimeIntervalTypeInfo}
 
@@ -48,7 +49,7 @@ object TypeInfoCheckUtils {
 
   def isNumeric(dataType: TypeInformation[_]): Boolean = dataType match {
     case _: NumericTypeInfo[_] => true
-    case BIG_DEC_TYPE_INFO => true
+    case BIG_DEC_TYPE_INFO | _: BigDecimalTypeInfo | _: DecimalTypeInfo => true
     case _ => false
   }
 
@@ -56,7 +57,7 @@ object TypeInfoCheckUtils {
     isTimePoint(dataType) || isTimeInterval(dataType)
 
   def isTimePoint(dataType: TypeInformation[_]): Boolean =
-    dataType.isInstanceOf[SqlTimeTypeInfo[_]]
+    dataType.isInstanceOf[SqlTimeTypeInfo[_]] || dataType.isInstanceOf[LocalTimeTypeInfo[_]]
 
   def isTimeInterval(dataType: TypeInformation[_]): Boolean =
     dataType.isInstanceOf[TimeIntervalTypeInfo[_]]
@@ -103,7 +104,7 @@ object TypeInfoCheckUtils {
   : ValidationResult = dataType match {
     case _: NumericTypeInfo[_] =>
       ValidationSuccess
-    case BIG_DEC_TYPE_INFO =>
+    case BIG_DEC_TYPE_INFO | _: BigDecimalTypeInfo | _: DecimalTypeInfo =>
       ValidationSuccess
     case _ =>
       ValidationFailure(s"$caller requires numeric types, get $dataType here")