You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/11 11:19:33 UTC

[flink] 03/05: [FLINK-13107][table-planner-blink] Derive sum, avg, div return type in planner expressions using behavior of blink

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b75d75afb9b78cea04b5d85a15dfef4e73ef01e0
Author: beyond1920 <be...@126.com>
AuthorDate: Fri Jul 5 14:29:22 2019 +0800

    [FLINK-13107][table-planner-blink] Derive sum, avg, div return type in planner expressions using behavior of blink
---
 .../flink/table/expressions/aggregations.scala     | 38 +++++++++++++++++-----
 .../flink/table/expressions/arithmetic.scala       | 10 ++++--
 2 files changed, 38 insertions(+), 10 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/aggregations.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
index d8edcec..27dadfc 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
@@ -19,9 +19,10 @@ package org.apache.flink.table.expressions
 
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.MultisetTypeInfo
+import org.apache.flink.table.calcite.FlinkTypeSystem
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
 import org.apache.flink.table.functions.{AggregateFunction, TableAggregateFunction, UserDefinedAggregateFunction}
-import org.apache.flink.table.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
+import org.apache.flink.table.types.TypeInfoLogicalTypeConverter.{fromLogicalTypeToTypeInfo, fromTypeInfoToLogicalType}
 import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
 import org.apache.flink.table.typeutils.TypeInfoCheckUtils
 import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
@@ -57,7 +58,10 @@ case class Sum(child: PlannerExpression) extends Aggregation {
   override private[flink] def children: Seq[PlannerExpression] = Seq(child)
   override def toString = s"sum($child)"
 
-  override private[flink] def resultType = child.resultType
+  override private[flink] def resultType = {
+    fromLogicalTypeToTypeInfo(FlinkTypeSystem.deriveSumType(
+      fromTypeInfoToLogicalType(child.resultType)))
+  }
 
   override private[flink] def validateInput() =
     TypeInfoCheckUtils.assertNumericExpr(child.resultType, "sum")
@@ -67,7 +71,10 @@ case class Sum0(child: PlannerExpression) extends Aggregation {
   override private[flink] def children: Seq[PlannerExpression] = Seq(child)
   override def toString = s"sum0($child)"
 
-  override private[flink] def resultType = child.resultType
+  override private[flink] def resultType = {
+    fromLogicalTypeToTypeInfo(FlinkTypeSystem.deriveSumType(
+      fromTypeInfoToLogicalType(child.resultType)))
+  }
 
   override private[flink] def validateInput() =
     TypeInfoCheckUtils.assertNumericExpr(child.resultType, "sum0")
@@ -104,7 +111,10 @@ case class Avg(child: PlannerExpression) extends Aggregation {
   override private[flink] def children: Seq[PlannerExpression] = Seq(child)
   override def toString = s"avg($child)"
 
-  override private[flink] def resultType = child.resultType
+  override private[flink] def resultType = {
+    fromLogicalTypeToTypeInfo(FlinkTypeSystem.deriveAvgAggType(
+      fromTypeInfoToLogicalType(child.resultType)))
+  }
 
   override private[flink] def validateInput() =
     TypeInfoCheckUtils.assertNumericExpr(child.resultType, "avg")
@@ -127,7 +137,10 @@ case class StddevPop(child: PlannerExpression) extends Aggregation {
   override private[flink] def children: Seq[PlannerExpression] = Seq(child)
   override def toString = s"stddev_pop($child)"
 
-  override private[flink] def resultType = child.resultType
+  override private[flink] def resultType = {
+    fromLogicalTypeToTypeInfo(FlinkTypeSystem.deriveAvgAggType(
+      fromTypeInfoToLogicalType(child.resultType)))
+  }
 
   override private[flink] def validateInput() =
     TypeInfoCheckUtils.assertNumericExpr(child.resultType, "stddev_pop")
@@ -137,7 +150,10 @@ case class StddevSamp(child: PlannerExpression) extends Aggregation {
   override private[flink] def children: Seq[PlannerExpression] = Seq(child)
   override def toString = s"stddev_samp($child)"
 
-  override private[flink] def resultType = child.resultType
+  override private[flink] def resultType = {
+    fromLogicalTypeToTypeInfo(FlinkTypeSystem.deriveAvgAggType(
+      fromTypeInfoToLogicalType(child.resultType)))
+  }
 
   override private[flink] def validateInput() =
     TypeInfoCheckUtils.assertNumericExpr(child.resultType, "stddev_samp")
@@ -147,7 +163,10 @@ case class VarPop(child: PlannerExpression) extends Aggregation {
   override private[flink] def children: Seq[PlannerExpression] = Seq(child)
   override def toString = s"var_pop($child)"
 
-  override private[flink] def resultType = child.resultType
+  override private[flink] def resultType = {
+    fromLogicalTypeToTypeInfo(FlinkTypeSystem.deriveAvgAggType(
+      fromTypeInfoToLogicalType(child.resultType)))
+  }
 
   override private[flink] def validateInput() =
     TypeInfoCheckUtils.assertNumericExpr(child.resultType, "var_pop")
@@ -157,7 +176,10 @@ case class VarSamp(child: PlannerExpression) extends Aggregation {
   override private[flink] def children: Seq[PlannerExpression] = Seq(child)
   override def toString = s"var_samp($child)"
 
-  override private[flink] def resultType = child.resultType
+  override private[flink] def resultType = {
+    fromLogicalTypeToTypeInfo(FlinkTypeSystem.deriveAvgAggType(
+      fromTypeInfoToLogicalType(child.resultType)))
+  }
 
   override private[flink] def validateInput() =
     TypeInfoCheckUtils.assertNumericExpr(child.resultType, "var_samp")
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/arithmetic.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/arithmetic.scala
index 1106e8a..726d9ff 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/arithmetic.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/arithmetic.scala
@@ -17,10 +17,10 @@
  */
 package org.apache.flink.table.expressions
 
-import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable
 import org.apache.flink.table.types.TypeInfoLogicalTypeConverter.{fromLogicalTypeToTypeInfo, fromTypeInfoToLogicalType}
-import org.apache.flink.table.typeutils.TypeCoercion
+import org.apache.flink.table.typeutils.{DecimalTypeInfo, TypeCoercion}
 import org.apache.flink.table.typeutils.TypeInfoCheckUtils._
 import org.apache.flink.table.validate._
 
@@ -117,6 +117,12 @@ case class Div(left: PlannerExpression, right: PlannerExpression) extends Binary
   override def toString = s"($left / $right)"
 
   private[flink] val sqlOperator = FlinkSqlOperatorTable.DIVIDE
+
+  override private[flink] def resultType: TypeInformation[_] =
+    super.resultType match {
+      case dt: DecimalTypeInfo => dt
+      case _ => BasicTypeInfo.DOUBLE_TYPE_INFO
+    }
 }
 
 case class Mul(left: PlannerExpression, right: PlannerExpression) extends BinaryArithmetic {