You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/09 03:17:13 UTC

[flink] 03/03: [FLINK-13523][table-planner-blink] Refactor AVG aggregate function to keep it compatible with old planner

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 421f0a559a3038d2f9f56ba2cbab8e6d1832812a
Author: Zhenghua Gao <do...@gmail.com>
AuthorDate: Fri Aug 2 12:05:26 2019 +0800

    [FLINK-13523][table-planner-blink] Refactor AVG aggregate function to keep it compatible with old planner
    
    The behavior of AVG aggregate function in blink planner always return double/decimal type which is not standard.
---
 .../functions/aggfunctions/AvgAggFunction.java     |  83 +++++++++++++-
 .../aggfunctions/DeclarativeAggregateFunction.java |   3 +
 .../table/planner/calcite/FlinkTypeSystem.scala    |   2 +-
 .../codegen/agg/DeclarativeAggCodeGen.scala        |   2 +-
 .../codegen/agg/batch/AggCodeGenHelper.scala       |   1 -
 .../codegen/agg/batch/HashAggCodeGenHelper.scala   |   1 -
 .../plan/rules/logical/SplitAggregateRule.scala    |  23 +++-
 .../planner/plan/utils/AggFunctionFactory.scala    |  15 ++-
 .../table/planner/codegen/agg/TestLongAvgFunc.java |   6 +-
 .../planner/plan/batch/sql/RemoveCollationTest.xml |   4 +-
 .../planner/plan/batch/sql/RemoveShuffleTest.xml   |  12 +-
 .../planner/plan/batch/sql/SubplanReuseTest.xml    |  10 +-
 .../batch/sql/agg/AggregateReduceGroupingTest.xml  |  12 +-
 .../plan/batch/sql/agg/HashAggregateTest.xml       |  12 +-
 .../plan/batch/sql/agg/OverAggregateTest.xml       |  20 ++--
 .../plan/batch/sql/agg/SortAggregateTest.xml       |  12 +-
 .../plan/batch/sql/agg/WindowAggregateTest.xml     |  18 +--
 .../logical/AggregateReduceGroupingRuleTest.xml    |  12 +-
 .../plan/rules/logical/SplitAggregateRuleTest.xml  |   4 +-
 .../rules/logical/WindowGroupReorderRuleTest.xml   |  24 ++--
 .../logical/subquery/SubQuerySemiJoinTest.xml      |  16 ++-
 .../planner/plan/stream/sql/agg/AggregateTest.xml  |   4 +-
 .../plan/stream/sql/agg/DistinctAggregateTest.xml  |  12 +-
 .../stream/sql/agg/IncrementalAggregateTest.xml    |   4 +-
 .../plan/stream/sql/agg/OverAggregateTest.xml      |   4 +-
 .../plan/stream/sql/agg/WindowAggregateTest.xml    |   6 +-
 .../planner/plan/stream/table/GroupWindowTest.xml  |   4 +-
 .../table/planner/codegen/agg/AggTestBase.scala    |   9 +-
 .../codegen/agg/AggsHandlerCodeGeneratorTest.scala |  13 +--
 .../codegen/agg/batch/AggWithoutKeysTest.scala     |   9 +-
 .../codegen/agg/batch/BatchAggTestBase.scala       |   8 +-
 .../agg/batch/HashAggCodeGeneratorTest.scala       |  13 +--
 .../agg/batch/SortAggCodeGeneratorTest.scala       |   8 +-
 .../planner/expressions/ScalarFunctionsTest.scala  |   4 +-
 .../metadata/AggCallSelectivityEstimatorTest.scala |   2 -
 .../planner/runtime/batch/sql/DecimalITCase.scala  |  19 ----
 .../planner/runtime/batch/sql/MiscITCase.scala     |   2 +-
 .../runtime/batch/sql/OverWindowITCase.scala       | 120 ++++++++++----------
 .../batch/sql/agg/AggregateITCaseBase.scala        |  26 ++---
 .../sql/agg/AggregateReduceGroupingITCase.scala    |  32 +++---
 .../runtime/batch/sql/agg/GroupingSetsITCase.scala | 120 ++++++++++----------
 .../batch/sql/agg/WindowAggregateITCase.scala      |  44 ++++----
 .../runtime/batch/table/AggregationITCase.scala    |  18 +--
 .../runtime/stream/sql/AggregateITCase.scala       |  12 +-
 .../runtime/stream/sql/MatchRecognizeITCase.scala  |   2 +-
 .../runtime/stream/sql/OverWindowITCase.scala      | 124 ++++++++++-----------
 .../planner/runtime/stream/sql/RankITCase.scala    |   8 +-
 .../runtime/stream/sql/SplitAggregateITCase.scala  |   4 +-
 .../runtime/stream/table/AggregateITCase.scala     |   6 +-
 .../runtime/stream/table/GroupWindowITCase.scala   |  10 +-
 .../stream/table/MiniBatchGroupWindowITCase.scala  |   8 +-
 .../runtime/stream/table/OverWindowITCase.scala    |  32 +++---
 .../flink/table/planner/utils/AvgAggFunction.scala |  24 ++--
 53 files changed, 535 insertions(+), 468 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/AvgAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/AvgAggFunction.java
index fab4f72..8e522a5 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/AvgAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/AvgAggFunction.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.types.logical.DecimalType;
 import java.math.BigDecimal;
 
 import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedRef;
+import static org.apache.flink.table.planner.expressions.ExpressionBuilder.cast;
 import static org.apache.flink.table.planner.expressions.ExpressionBuilder.div;
 import static org.apache.flink.table.planner.expressions.ExpressionBuilder.equalTo;
 import static org.apache.flink.table.planner.expressions.ExpressionBuilder.ifThenElse;
@@ -36,6 +37,7 @@ import static org.apache.flink.table.planner.expressions.ExpressionBuilder.liter
 import static org.apache.flink.table.planner.expressions.ExpressionBuilder.minus;
 import static org.apache.flink.table.planner.expressions.ExpressionBuilder.nullOf;
 import static org.apache.flink.table.planner.expressions.ExpressionBuilder.plus;
+import static org.apache.flink.table.planner.expressions.ExpressionBuilder.typeLiteral;
 
 /**
  * built-in avg aggregate function.
@@ -103,19 +105,51 @@ public abstract class AvgAggFunction extends DeclarativeAggregateFunction {
 	 */
 	@Override
 	public Expression getValueExpression() {
-		return ifThenElse(equalTo(count, literal(0L)), nullOf(getResultType()), div(sum, count));
+		Expression ifTrue = nullOf(getResultType());
+		Expression ifFalse = cast(div(sum, count), typeLiteral(getResultType()));
+		return ifThenElse(equalTo(count, literal(0L)), ifTrue, ifFalse);
 	}
 
 	/**
-	 * Built-in Int Avg aggregate function for integral arguments,
-	 * including BYTE, SHORT, INT, LONG.
-	 * The result type is DOUBLE.
+	 * Built-in Byte Avg aggregate function.
 	 */
-	public static class IntegralAvgAggFunction extends AvgAggFunction {
+	public static class ByteAvgAggFunction extends AvgAggFunction {
 
 		@Override
 		public DataType getResultType() {
-			return DataTypes.DOUBLE();
+			return DataTypes.TINYINT();
+		}
+
+		@Override
+		public DataType getSumType() {
+			return DataTypes.BIGINT();
+		}
+	}
+
+	/**
+	 * Built-in Short Avg aggregate function.
+	 */
+	public static class ShortAvgAggFunction extends AvgAggFunction {
+
+		@Override
+		public DataType getResultType() {
+			return DataTypes.SMALLINT();
+		}
+
+		@Override
+		public DataType getSumType() {
+			return DataTypes.BIGINT();
+		}
+	}
+
+	/**
+	 * Built-in Integer Avg aggregate function.
+	 */
+	public static class IntAvgAggFunction extends AvgAggFunction {
+
+		@Override
+		public DataType getResultType() {
+			return DataTypes.INT();
 		}
 
 		@Override
@@ -125,6 +159,43 @@ public abstract class AvgAggFunction extends DeclarativeAggregateFunction {
 	}
 
 	/**
+	 * Built-in Long Avg aggregate function.
+	 */
+	public static class LongAvgAggFunction extends AvgAggFunction {
+
+		@Override
+		public DataType getResultType() {
+			return DataTypes.BIGINT();
+		}
+
+		@Override
+		public DataType getSumType() {
+			return DataTypes.BIGINT();
+		}
+	}
+
+	/**
+	 * Built-in Float Avg aggregate function.
+	 */
+	public static class FloatAvgAggFunction extends AvgAggFunction {
+
+		@Override
+		public DataType getResultType() {
+			return DataTypes.FLOAT();
+		}
+
+		@Override
+		public DataType getSumType() {
+			return DataTypes.DOUBLE();
+		}
+
+		@Override
+		public Expression[] initialValuesExpressions() {
+			return new Expression[] {literal(0D), literal(0L)};
+		}
+	}
+
+	/**
 	 * Built-in Double Avg aggregate function.
 	 */
 	public static class DoubleAvgAggFunction extends AvgAggFunction {
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/DeclarativeAggregateFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/DeclarativeAggregateFunction.java
index c1c96b0..c58aec8 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/DeclarativeAggregateFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/DeclarativeAggregateFunction.java
@@ -45,6 +45,9 @@ import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unreso
  * defining {@link #initialValuesExpressions}, {@link #accumulateExpressions},
  * {@link #mergeExpressions} and {@link #getValueExpression}.
  *
+ * <p>Note: Developer of DeclarativeAggregateFunction should guarantee that the inferred type
+ * of {@link #getValueExpression} is the same as {@link #getResultType()}
+ *
  * <p>See an full example: {@link AvgAggFunction}.
  */
 public abstract class DeclarativeAggregateFunction extends UserDefinedFunction {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeSystem.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeSystem.scala
index 5be7d11..e837424 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeSystem.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeSystem.scala
@@ -85,7 +85,7 @@ object FlinkTypeSystem {
     case dt: DecimalType =>
       val result = inferAggAvgType(dt.getScale)
       new DecimalType(result.getPrecision, result.getScale)
-    case nt if TypeCheckUtils.isNumeric(nt) => new DoubleType()
+    case nt if TypeCheckUtils.isNumeric(nt) => nt
     case _ =>
       throw new RuntimeException("Unsupported argType for AVG(): " + argType)
   }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DeclarativeAggCodeGen.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DeclarativeAggCodeGen.scala
index 7ae4c03..30efc86 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DeclarativeAggCodeGen.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DeclarativeAggCodeGen.scala
@@ -27,8 +27,8 @@ import org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregat
 import org.apache.flink.table.planner.plan.utils.AggregateInfo
 import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
 import org.apache.flink.table.types.logical.LogicalType
-
 import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions
 
 import scala.collection.JavaConverters._
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala
index 91a0ba0..2b95509 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala
@@ -38,7 +38,6 @@ import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDat
 import org.apache.flink.table.types.DataType
 import org.apache.flink.table.types.logical.LogicalTypeRoot._
 import org.apache.flink.table.types.logical.{LogicalType, RowType}
-
 import org.apache.calcite.rel.core.AggregateCall
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.tools.RelBuilder
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
index f436675..2fdf7b7 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
@@ -37,7 +37,6 @@ import org.apache.flink.table.runtime.operators.sort.BufferedKVExternalSorter
 import org.apache.flink.table.runtime.typeutils.BinaryRowSerializer
 import org.apache.flink.table.types.DataType
 import org.apache.flink.table.types.logical.{LogicalType, RowType}
-
 import org.apache.calcite.rel.core.AggregateCall
 import org.apache.calcite.tools.RelBuilder
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
index 524d5a3..ed6f538 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
@@ -27,7 +27,6 @@ import org.apache.flink.table.planner.plan.nodes.FlinkRelNode
 import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate
 import org.apache.flink.table.planner.plan.utils.AggregateUtil.doAllAggSupportSplit
 import org.apache.flink.table.planner.plan.utils.ExpandUtil
-
 import com.google.common.collect.ImmutableList
 import org.apache.calcite.plan.RelOptRule.{any, operand}
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
@@ -37,7 +36,7 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable._
 import org.apache.calcite.sql.fun.{SqlMinMaxAggFunction, SqlStdOperatorTable}
 import org.apache.calcite.sql.{SqlAggFunction, SqlKind}
 import org.apache.calcite.util.{ImmutableBitSet, ImmutableIntList}
-
+import java.math.{BigDecimal => JBigDecimal}
 import java.util
 
 import scala.collection.JavaConversions._
@@ -70,7 +69,8 @@ import scala.collection.JavaConversions._
   *
   * flink logical plan:
   * {{{
-  * FlinkLogicalCalc(select=[a, $f1, $f2, /($f3, $f4) AS $f3])
+  * FlinkLogicalCalc(select=[a, $f1, $f2, CAST(IF(=($f4, 0:BIGINT), null:INTEGER, /($f3, $f4))) AS
+  *     $f3])
   * +- FlinkLogicalAggregate(group=[{0}], agg#0=[SUM($2)], agg#1=[$SUM0($3)], agg#2=[$SUM0($4)],
   *        agg#3=[$SUM0($5)])
   *    +- FlinkLogicalAggregate(group=[{0, 3}], agg#0=[SUM($1) FILTER $4], agg#1=[COUNT(DISTINCT $2)
@@ -303,7 +303,22 @@ class SplitAggregateRule extends RelOptRule(
             aggGroupCount + index + avgAggCount + 1,
             finalAggregate.getRowType)
           avgAggCount += 1
-          relBuilder.call(FlinkSqlOperatorTable.DIVIDE, sumInputRef, countInputRef)
+          // Make a guarantee that the final aggregation returns NULL if underlying count is ZERO.
+          // We use SUM0 for underlying sum, which may run into ZERO / ZERO,
+          // and division by zero exception occurs.
+          // @see Glossary#SQL2011 SQL:2011 Part 2 Section 6.27
+          val equals = relBuilder.call(
+            FlinkSqlOperatorTable.EQUALS,
+            countInputRef,
+            relBuilder.getRexBuilder.makeBigintLiteral(JBigDecimal.valueOf(0)))
+          val ifTrue = relBuilder.cast(
+            relBuilder.getRexBuilder.constantNull(), aggCall.`type`.getSqlTypeName)
+          val ifFalse = relBuilder.call(FlinkSqlOperatorTable.DIVIDE, sumInputRef, countInputRef)
+          relBuilder.call(
+            FlinkSqlOperatorTable.IF,
+            equals,
+            ifTrue,
+            ifFalse)
         } else {
           RexInputRef.of(aggGroupCount + index + avgAggCount, finalAggregate.getRowType)
         }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
index d5f9e56..8a8c901 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
@@ -37,7 +37,6 @@ import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter
 import org.apache.flink.table.runtime.typeutils.DecimalTypeInfo
 import org.apache.flink.table.types.logical.LogicalTypeRoot._
 import org.apache.flink.table.types.logical._
-
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.AggregateCall
 import org.apache.calcite.sql.fun._
@@ -140,9 +139,17 @@ class AggFunctionFactory(
 
   private def createAvgAggFunction(argTypes: Array[LogicalType]): UserDefinedFunction = {
     argTypes(0).getTypeRoot match {
-      case TINYINT | SMALLINT | INTEGER | BIGINT =>
-        new AvgAggFunction.IntegralAvgAggFunction
-      case FLOAT | DOUBLE =>
+      case TINYINT =>
+        new AvgAggFunction.ByteAvgAggFunction
+      case SMALLINT =>
+        new AvgAggFunction.ShortAvgAggFunction
+      case INTEGER =>
+        new AvgAggFunction.IntAvgAggFunction
+      case BIGINT =>
+        new AvgAggFunction.LongAvgAggFunction
+      case FLOAT =>
+        new AvgAggFunction.FloatAvgAggFunction
+      case DOUBLE =>
         new AvgAggFunction.DoubleAvgAggFunction
       case DECIMAL =>
         val d = argTypes(0).asInstanceOf[DecimalType]
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/codegen/agg/TestLongAvgFunc.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/codegen/agg/TestLongAvgFunc.java
index afc778a..7f28d6b 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/codegen/agg/TestLongAvgFunc.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/codegen/agg/TestLongAvgFunc.java
@@ -27,7 +27,7 @@ import org.apache.flink.table.functions.AggregateFunction;
 /**
  * Test avg agg function.
  */
-public class TestLongAvgFunc extends AggregateFunction<Double, Tuple2<Long, Long>> {
+public class TestLongAvgFunc extends AggregateFunction<Long, Tuple2<Long, Long>> {
 
 	@Override
 	public Tuple2<Long, Long> createAccumulator() {
@@ -56,11 +56,11 @@ public class TestLongAvgFunc extends AggregateFunction<Double, Tuple2<Long, Long
 	}
 
 	@Override
-	public Double getValue(Tuple2<Long, Long> acc) {
+	public Long getValue(Tuple2<Long, Long> acc) {
 		if (acc.f1 == 0) {
 			return null;
 		} else {
-			return ((double) acc.f0 / acc.f1);
+			return acc.f0 / acc.f1;
 		}
 	}
 
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.xml
index 76b6419..401483c 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.xml
@@ -502,7 +502,7 @@ SortMergeJoin(joinType=[LeftOuterJoin], where=[AND(=(a1, d1), =(b1, e1))], selec
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(sum_b=[$1], avg_b=[/(CAST(CASE(>(COUNT($1) OVER (PARTITION BY $0 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), 0), $SUM0($1) OVER (PARTITION BY $0 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), null:BIGINT)):DOUBLE, COUNT($1) OVER (PARTITION BY $0 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW))], rn=[RANK() OVER (PARTITION BY $0 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING A [...]
+LogicalProject(sum_b=[$1], avg_b=[/(CASE(>(COUNT($1) OVER (PARTITION BY $0 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), 0), $SUM0($1) OVER (PARTITION BY $0 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), null:BIGINT), COUNT($1) OVER (PARTITION BY $0 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW))], rn=[RANK() OVER (PARTITION BY $0 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
 +- LogicalAggregate(group=[{0}], sum_b=[SUM($1)])
    +- LogicalProject(a=[$0], b=[$1])
       +- LogicalTableScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]])
@@ -510,7 +510,7 @@ LogicalProject(sum_b=[$1], avg_b=[/(CAST(CASE(>(COUNT($1) OVER (PARTITION BY $0
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[sum_b, /(CAST(CASE(>(w0$o0, 0:BIGINT), w0$o1, null:BIGINT)), w0$o0) AS avg_b, w0$o2 AS rn])
+Calc(select=[sum_b, /(CASE(>(w0$o0, 0:BIGINT), w0$o1, null:BIGINT), w0$o0) AS avg_b, w0$o2 AS rn])
 +- OverAggregate(partitionBy=[a], orderBy=[a ASC], window#0=[COUNT(sum_b) AS w0$o0, $SUM0(sum_b) AS w0$o1, RANK(*) AS w0$o2 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], window#1=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, sum_b, w0$o0, w0$o1, w0$o2])
    +- SortAggregate(isMerge=[true], groupBy=[a], select=[a, Final_SUM(sum$0) AS sum_b])
       +- Sort(orderBy=[a ASC])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveShuffleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveShuffleTest.xml
index 60ca0ec..8d971ce 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveShuffleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveShuffleTest.xml
@@ -554,7 +554,7 @@ HashJoin(joinType=[InnerJoin], where=[AND(=(a, d), =(b, cnt))], select=[a, b, c,
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(sum_b=[$2], avg_b=[/(CAST(CASE(>(COUNT($2) OVER (PARTITION BY $0, $1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), 0), $SUM0($2) OVER (PARTITION BY $0, $1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), null:BIGINT)):DOUBLE, COUNT($2) OVER (PARTITION BY $0, $1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING))], rn=[RANK() OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST, $1 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)], c=[$1])
+LogicalProject(sum_b=[$2], avg_b=[/(CASE(>(COUNT($2) OVER (PARTITION BY $0, $1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), 0), $SUM0($2) OVER (PARTITION BY $0, $1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), null:BIGINT), COUNT($2) OVER (PARTITION BY $0, $1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING))], rn=[RANK() OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST, $1 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)], c=[$1])
 +- LogicalAggregate(group=[{0, 1}], sum_b=[SUM($2)])
    +- LogicalProject(a=[$0], c=[$2], b=[$1])
       +- LogicalTableScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]])
@@ -562,7 +562,7 @@ LogicalProject(sum_b=[$2], avg_b=[/(CAST(CASE(>(COUNT($2) OVER (PARTITION BY $0,
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[sum_b, /(CAST(CASE(>(w0$o0, 0:BIGINT), w0$o1, null:BIGINT)), w0$o0) AS avg_b, w1$o0 AS rn, c])
+Calc(select=[sum_b, /(CASE(>(w0$o0, 0:BIGINT), w0$o1, null:BIGINT), w0$o0) AS avg_b, w1$o0 AS rn, c])
 +- OverAggregate(partitionBy=[c], orderBy=[a ASC, c ASC], window#0=[RANK(*) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, sum_b, w0$o0, w0$o1, w1$o0])
    +- Sort(orderBy=[c ASC, a ASC])
       +- Exchange(distribution=[hash[c]])
@@ -630,7 +630,7 @@ NestedLoopJoin(joinType=[InnerJoin], where=[=(a, d0)], select=[a, b, c, d, e, f,
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(sum_b=[$1], avg_b=[/(CAST(CASE(>(COUNT($1) OVER (PARTITION BY $0 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), 0), $SUM0($1) OVER (PARTITION BY $0 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), null:BIGINT)):DOUBLE, COUNT($1) OVER (PARTITION BY $0 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING))], rn=[RANK() OVER (PARTITION BY $0 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)], c=[$0])
+LogicalProject(sum_b=[$1], avg_b=[/(CASE(>(COUNT($1) OVER (PARTITION BY $0 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), 0), $SUM0($1) OVER (PARTITION BY $0 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), null:BIGINT), COUNT($1) OVER (PARTITION BY $0 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING))], rn=[RANK() OVER (PARTITION BY $0 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)], c=[$0])
 +- LogicalAggregate(group=[{0}], sum_b=[SUM($1)])
    +- LogicalProject(c=[$2], b=[$1])
       +- LogicalTableScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]])
@@ -638,7 +638,7 @@ LogicalProject(sum_b=[$1], avg_b=[/(CAST(CASE(>(COUNT($1) OVER (PARTITION BY $0
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[sum_b, /(CAST(CASE(>(w0$o0, 0:BIGINT), w0$o1, null:BIGINT)), w0$o0) AS avg_b, w1$o0 AS rn, c])
+Calc(select=[sum_b, /(CASE(>(w0$o0, 0:BIGINT), w0$o1, null:BIGINT), w0$o0) AS avg_b, w1$o0 AS rn, c])
 +- OverAggregate(partitionBy=[c], orderBy=[], window#0=[COUNT(sum_b) AS w0$o0, $SUM0(sum_b) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], window#1=[RANK(*) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[c, sum_b, w0$o0, w0$o1, w1$o0])
    +- Sort(orderBy=[c ASC])
       +- HashAggregate(isMerge=[true], groupBy=[c], select=[c, Final_SUM(sum$0) AS sum_b])
@@ -663,7 +663,7 @@ Calc(select=[sum_b, /(CAST(CASE(>(w0$o0, 0:BIGINT), w0$o1, null:BIGINT)), w0$o0)
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(sum_b=[$2], avg_b=[/(CAST(CASE(>(COUNT($2) OVER (PARTITION BY $1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), 0), $SUM0($2) OVER (PARTITION BY $1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), null:BIGINT)):DOUBLE, COUNT($2) OVER (PARTITION BY $1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING))], rn=[RANK() OVER (PARTITION BY $1 ORDER BY $1 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)], c=[$1])
+LogicalProject(sum_b=[$2], avg_b=[/(CASE(>(COUNT($2) OVER (PARTITION BY $1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), 0), $SUM0($2) OVER (PARTITION BY $1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), null:BIGINT), COUNT($2) OVER (PARTITION BY $1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING))], rn=[RANK() OVER (PARTITION BY $1 ORDER BY $1 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)], c=[$1])
 +- LogicalAggregate(group=[{0, 1}], sum_b=[SUM($2)])
    +- LogicalProject(a=[$0], c=[$2], b=[$1])
       +- LogicalTableScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]])
@@ -671,7 +671,7 @@ LogicalProject(sum_b=[$2], avg_b=[/(CAST(CASE(>(COUNT($2) OVER (PARTITION BY $1
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[sum_b, /(CAST(CASE(>(w0$o0, 0:BIGINT), w0$o1, null:BIGINT)), w0$o0) AS avg_b, w1$o0 AS rn, c])
+Calc(select=[sum_b, /(CASE(>(w0$o0, 0:BIGINT), w0$o1, null:BIGINT), w0$o0) AS avg_b, w1$o0 AS rn, c])
 +- OverAggregate(partitionBy=[c], orderBy=[], window#0=[COUNT(sum_b) AS w0$o0, $SUM0(sum_b) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], window#1=[RANK(*) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[c, sum_b, w0$o0, w0$o1, w1$o0])
    +- Calc(select=[c, sum_b])
       +- Sort(orderBy=[c ASC])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.xml
index 9629fb3..d36ed2b 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.xml
@@ -213,21 +213,21 @@ LogicalProject(c=[$0], e=[$1], avg_b=[$2], sum_b=[$3], psum=[$4], nsum=[$5], avg
       +- LogicalFilter(condition=[AND(=($3, $8), =($3, $13), =($4, $9), =($4, $14), =($2, +($7, 1)), =($2, -($12, 1)))])
          +- LogicalJoin(condition=[true], joinType=[inner])
             :- LogicalJoin(condition=[true], joinType=[inner])
-            :  :- LogicalProject(sum_b=[$2], avg_b=[/(CAST(CASE(>(COUNT($2) OVER (PARTITION BY $0, $1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), 0), $SUM0($2) OVER (PARTITION BY $0, $1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), null:BIGINT)):DOUBLE, COUNT($2) OVER (PARTITION BY $0, $1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING))], rn=[RANK() OVER (PARTITION BY $0, $1 ORDER BY $0 NULLS FIRST, $1 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING [...]
+            :  :- LogicalProject(sum_b=[$2], avg_b=[/(CASE(>(COUNT($2) OVER (PARTITION BY $0, $1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), 0), $SUM0($2) OVER (PARTITION BY $0, $1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), null:BIGINT), COUNT($2) OVER (PARTITION BY $0, $1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING))], rn=[RANK() OVER (PARTITION BY $0, $1 ORDER BY $0 NULLS FIRST, $1 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT  [...]
             :  :  +- LogicalAggregate(group=[{0, 1}], sum_b=[SUM($2)])
             :  :     +- LogicalProject(c=[$2], e=[$4], b=[$1])
             :  :        +- LogicalFilter(condition=[AND(=($0, $3), IS NOT NULL($2), >($4, 10))])
             :  :           +- LogicalJoin(condition=[true], joinType=[inner])
             :  :              :- LogicalTableScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]])
             :  :              +- LogicalTableScan(table=[[default_catalog, default_database, y, source: [TestTableSource(d, e, f)]]])
-            :  +- LogicalProject(sum_b=[$2], avg_b=[/(CAST(CASE(>(COUNT($2) OVER (PARTITION BY $0, $1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), 0), $SUM0($2) OVER (PARTITION BY $0, $1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), null:BIGINT)):DOUBLE, COUNT($2) OVER (PARTITION BY $0, $1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING))], rn=[RANK() OVER (PARTITION BY $0, $1 ORDER BY $0 NULLS FIRST, $1 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING [...]
+            :  +- LogicalProject(sum_b=[$2], avg_b=[/(CASE(>(COUNT($2) OVER (PARTITION BY $0, $1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), 0), $SUM0($2) OVER (PARTITION BY $0, $1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), null:BIGINT), COUNT($2) OVER (PARTITION BY $0, $1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING))], rn=[RANK() OVER (PARTITION BY $0, $1 ORDER BY $0 NULLS FIRST, $1 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT  [...]
             :     +- LogicalAggregate(group=[{0, 1}], sum_b=[SUM($2)])
             :        +- LogicalProject(c=[$2], e=[$4], b=[$1])
             :           +- LogicalFilter(condition=[AND(=($0, $3), IS NOT NULL($2), >($4, 10))])
             :              +- LogicalJoin(condition=[true], joinType=[inner])
             :                 :- LogicalTableScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]])
             :                 +- LogicalTableScan(table=[[default_catalog, default_database, y, source: [TestTableSource(d, e, f)]]])
-            +- LogicalProject(sum_b=[$2], avg_b=[/(CAST(CASE(>(COUNT($2) OVER (PARTITION BY $0, $1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), 0), $SUM0($2) OVER (PARTITION BY $0, $1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), null:BIGINT)):DOUBLE, COUNT($2) OVER (PARTITION BY $0, $1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING))], rn=[RANK() OVER (PARTITION BY $0, $1 ORDER BY $0 NULLS FIRST, $1 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AN [...]
+            +- LogicalProject(sum_b=[$2], avg_b=[/(CASE(>(COUNT($2) OVER (PARTITION BY $0, $1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), 0), $SUM0($2) OVER (PARTITION BY $0, $1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), null:BIGINT), COUNT($2) OVER (PARTITION BY $0, $1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING))], rn=[RANK() OVER (PARTITION BY $0, $1 ORDER BY $0 NULLS FIRST, $1 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW [...]
                +- LogicalAggregate(group=[{0, 1}], sum_b=[SUM($2)])
                   +- LogicalProject(c=[$2], e=[$4], b=[$1])
                      +- LogicalFilter(condition=[AND(=($0, $3), IS NOT NULL($2), >($4, 10))])
@@ -243,7 +243,7 @@ Calc(select=[c, e, avg_b, sum_b, sum_b0 AS psum, sum_b1 AS nsum, avg_b0 AS avg_b
    :- Calc(select=[sum_b, avg_b, rn, c, e, sum_b0, avg_b0])
    :  +- HashJoin(joinType=[InnerJoin], where=[AND(=(c, c0), =(e, e0), =(rn, $f5))], select=[sum_b, avg_b, rn, c, e, sum_b0, avg_b0, c0, e0, $f5], build=[left])
    :     :- Exchange(distribution=[hash[c, e, rn]])
-   :     :  +- Calc(select=[sum_b, /(CAST(CASE(>(w0$o0, 0:BIGINT), w0$o1, null:BIGINT)), w0$o0) AS avg_b, w1$o0 AS rn, c, e], where=[AND(<>(c, _UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), >(-(sum_b, /(CAST(CASE(>(w0$o0, 0:BIGINT), w0$o1, null:BIGINT)), w0$o0)), 3))])
+   :     :  +- Calc(select=[sum_b, /(CASE(>(w0$o0, 0:BIGINT), w0$o1, null:BIGINT), w0$o0) AS avg_b, w1$o0 AS rn, c, e], where=[AND(<>(c, _UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), >(-(sum_b, /(CASE(>(w0$o0, 0:BIGINT), w0$o1, null:BIGINT), w0$o0)), 3))])
    :     :     +- OverAggregate(partitionBy=[c, e], orderBy=[], window#0=[COUNT(sum_b) AS w0$o0, $SUM0(sum_b) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], window#1=[RANK(*) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[c, e, sum_b, w0$o0, w0$o1, w1$o0], reuse_id=[1])
    :     :        +- Sort(orderBy=[c ASC, e ASC], reuse_id=[2])
    :     :           +- HashAggregate(isMerge=[true], groupBy=[c, e], select=[c, e, Final_SUM(sum$0) AS sum_b])
@@ -258,7 +258,7 @@ Calc(select=[c, e, avg_b, sum_b, sum_b0 AS psum, sum_b1 AS nsum, avg_b0 AS avg_b
    :     :                             +- Calc(select=[d, e], where=[>(e, 10)])
    :     :                                +- TableSourceScan(table=[[default_catalog, default_database, y, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
    :     +- Exchange(distribution=[hash[c, e, $f5]], shuffle_mode=[BATCH])
-   :        +- Calc(select=[sum_b, /(CAST(CASE(>(w0$o0, 0:BIGINT), w0$o1, null:BIGINT)), w0$o0) AS avg_b, c, e, +(w1$o0, 1) AS $f5])
+   :        +- Calc(select=[sum_b, /(CASE(>(w0$o0, 0:BIGINT), w0$o1, null:BIGINT), w0$o0) AS avg_b, c, e, +(w1$o0, 1) AS $f5])
    :           +- Reused(reference_id=[1])
    +- Exchange(distribution=[hash[c, e, $f5]], shuffle_mode=[BATCH])
       +- Calc(select=[sum_b, c, e, -(w0$o0, 1) AS $f5])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml
index 475744b..7139daf 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml
@@ -410,9 +410,9 @@ Calc(select=[a4, c4, s, EXPR$3])
 +- HashAggregate(isMerge=[true], groupBy=[a4, s], auxGrouping=[c4], select=[a4, s, c4, Final_COUNT(count$0) AS EXPR$3])
    +- Exchange(distribution=[hash[a4, s]])
       +- LocalHashAggregate(groupBy=[a4, s], auxGrouping=[c4], select=[a4, s, c4, Partial_COUNT(b4) AS count$0])
-         +- Calc(select=[a4, c4, w$start AS s, /(-($f2, /(*(CAST($f3), CAST($f3)), $f4)), $f4) AS b4])
+         +- Calc(select=[a4, c4, w$start AS s, CAST(/(-($f2, /(*($f3, $f3), $f4)), $f4)) AS b4])
             +- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[a4, c4 AS $f2, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4) AS $f4])
-               +- Calc(select=[a4, c4, d4, b4, *(CAST(b4), CAST(b4)) AS $f4])
+               +- Calc(select=[a4, c4, d4, b4, *(b4, b4) AS $f4])
                   +- Exchange(distribution=[hash[a4]])
                      +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4, b4, c4, d4])
 ]]>
@@ -437,9 +437,9 @@ Calc(select=[a4, c4, e, EXPR$3])
 +- HashAggregate(isMerge=[true], groupBy=[a4, e], auxGrouping=[c4], select=[a4, e, c4, Final_COUNT(count$0) AS EXPR$3])
    +- Exchange(distribution=[hash[a4, e]])
       +- LocalHashAggregate(groupBy=[a4, e], auxGrouping=[c4], select=[a4, e, c4, Partial_COUNT(b4) AS count$0])
-         +- Calc(select=[a4, c4, w$end AS e, /(-($f2, /(*(CAST($f3), CAST($f3)), $f4)), $f4) AS b4])
+         +- Calc(select=[a4, c4, w$end AS e, CAST(/(-($f2, /(*($f3, $f3), $f4)), $f4)) AS b4])
             +- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[a4, c4 AS $f2, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4) AS $f4])
-               +- Calc(select=[a4, c4, d4, b4, *(CAST(b4), CAST(b4)) AS $f4])
+               +- Calc(select=[a4, c4, d4, b4, *(b4, b4) AS $f4])
                   +- Exchange(distribution=[hash[a4]])
                      +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4, b4, c4, d4])
 ]]>
@@ -463,9 +463,9 @@ LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()])
 HashAggregate(isMerge=[true], groupBy=[a4, b4], auxGrouping=[c4], select=[a4, b4, c4, Final_COUNT(count1$0) AS EXPR$3])
 +- Exchange(distribution=[hash[a4, b4]])
    +- LocalHashAggregate(groupBy=[a4, b4], auxGrouping=[c4], select=[a4, b4, c4, Partial_COUNT(*) AS count1$0])
-      +- Calc(select=[a4, /(-($f2, /(*(CAST($f3), CAST($f3)), $f4)), $f4) AS b4, c4])
+      +- Calc(select=[a4, CAST(/(-($f2, /(*($f3, $f3), $f4)), $f4)) AS b4, c4])
          +- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], window=[TumblingGroupWindow], select=[a4, c4 AS $f2, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4) AS $f4])
-            +- Calc(select=[a4, c4, d4, b4, *(CAST(b4), CAST(b4)) AS $f4])
+            +- Calc(select=[a4, c4, d4, b4, *(b4, b4) AS $f4])
                +- Exchange(distribution=[hash[a4]])
                   +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4, b4, c4, d4])
 ]]>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml
index edfa8a9..3734051 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml
@@ -337,14 +337,14 @@ FROM MyTable
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{}], EXPR$0=[AVG($0)], EXPR$1=[AVG($1)], EXPR$2=[AVG($2)], EXPR$3=[AVG($3)], EXPR$4=[AVG($4)], EXPR$5=[AVG($5)], EXPR$6=[AVG($6)], EXPR$7=[AVG($7)]), rowType=[RecordType(DOUBLE EXPR$0, DOUBLE EXPR$1, DOUBLE EXPR$2, DOUBLE EXPR$3, DOUBLE EXPR$4, DOUBLE EXPR$5, DECIMAL(38, 20) EXPR$6, DECIMAL(38, 6) EXPR$7)]
+LogicalAggregate(group=[{}], EXPR$0=[AVG($0)], EXPR$1=[AVG($1)], EXPR$2=[AVG($2)], EXPR$3=[AVG($3)], EXPR$4=[AVG($4)], EXPR$5=[AVG($5)], EXPR$6=[AVG($6)], EXPR$7=[AVG($7)]), rowType=[RecordType(TINYINT EXPR$0, SMALLINT EXPR$1, INTEGER EXPR$2, BIGINT EXPR$3, FLOAT EXPR$4, DOUBLE EXPR$5, DECIMAL(38, 20) EXPR$6, DECIMAL(38, 6) EXPR$7)]
 +- LogicalProject(byte=[$0], short=[$1], int=[$2], long=[$3], float=[$4], double=[$5], decimal3020=[$11], decimal105=[$12]), rowType=[RecordType(TINYINT byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(byte, short, int, long, float, double, boolean, string, date, time, timestamp, decimal3020, decimal105)]]]), rowType=[RecordType(TINYINT byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, TIMESTAMP(3) timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-HashAggregate(isMerge=[true], select=[Final_AVG(sum$0, count$1) AS EXPR$0, Final_AVG(sum$2, count$3) AS EXPR$1, Final_AVG(sum$4, count$5) AS EXPR$2, Final_AVG(sum$6, count$7) AS EXPR$3, Final_AVG(sum$8, count$9) AS EXPR$4, Final_AVG(sum$10, count$11) AS EXPR$5, Final_AVG(sum$12, count$13) AS EXPR$6, Final_AVG(sum$14, count$15) AS EXPR$7]), rowType=[RecordType(DOUBLE EXPR$0, DOUBLE EXPR$1, DOUBLE EXPR$2, DOUBLE EXPR$3, DOUBLE EXPR$4, DOUBLE EXPR$5, DECIMAL(38, 20) EXPR$6, DECIMAL(38, 6) EXPR$7)]
+HashAggregate(isMerge=[true], select=[Final_AVG(sum$0, count$1) AS EXPR$0, Final_AVG(sum$2, count$3) AS EXPR$1, Final_AVG(sum$4, count$5) AS EXPR$2, Final_AVG(sum$6, count$7) AS EXPR$3, Final_AVG(sum$8, count$9) AS EXPR$4, Final_AVG(sum$10, count$11) AS EXPR$5, Final_AVG(sum$12, count$13) AS EXPR$6, Final_AVG(sum$14, count$15) AS EXPR$7]), rowType=[RecordType(TINYINT EXPR$0, SMALLINT EXPR$1, INTEGER EXPR$2, BIGINT EXPR$3, FLOAT EXPR$4, DOUBLE EXPR$5, DECIMAL(38, 20) EXPR$6, DECIMAL(38, 6 [...]
 +- Exchange(distribution=[single]), rowType=[RecordType(BIGINT sum$0, BIGINT count$1, BIGINT sum$2, BIGINT count$3, BIGINT sum$4, BIGINT count$5, BIGINT sum$6, BIGINT count$7, DOUBLE sum$8, BIGINT count$9, DOUBLE sum$10, BIGINT count$11, DECIMAL(38, 20) sum$12, BIGINT count$13, DECIMAL(38, 5) sum$14, BIGINT count$15)]
    +- LocalHashAggregate(select=[Partial_AVG(byte) AS (sum$0, count$1), Partial_AVG(short) AS (sum$2, count$3), Partial_AVG(int) AS (sum$4, count$5), Partial_AVG(long) AS (sum$6, count$7), Partial_AVG(float) AS (sum$8, count$9), Partial_AVG(double) AS (sum$10, count$11), Partial_AVG(decimal3020) AS (sum$12, count$13), Partial_AVG(decimal105) AS (sum$14, count$15)]), rowType=[RecordType(BIGINT sum$0, BIGINT count$1, BIGINT sum$2, BIGINT count$3, BIGINT sum$4, BIGINT count$5, BIGINT sum$6, [...]
       +- Calc(select=[byte, short, int, long, float, double, decimal3020, decimal105]), rowType=[RecordType(TINYINT byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
@@ -368,14 +368,14 @@ FROM MyTable
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{}], EXPR$0=[AVG($0)], EXPR$1=[AVG($1)], EXPR$2=[AVG($2)], EXPR$3=[AVG($3)], EXPR$4=[AVG($4)], EXPR$5=[AVG($5)], EXPR$6=[AVG($6)], EXPR$7=[AVG($7)]), rowType=[RecordType(DOUBLE EXPR$0, DOUBLE EXPR$1, DOUBLE EXPR$2, DOUBLE EXPR$3, DOUBLE EXPR$4, DOUBLE EXPR$5, DECIMAL(38, 20) EXPR$6, DECIMAL(38, 6) EXPR$7)]
+LogicalAggregate(group=[{}], EXPR$0=[AVG($0)], EXPR$1=[AVG($1)], EXPR$2=[AVG($2)], EXPR$3=[AVG($3)], EXPR$4=[AVG($4)], EXPR$5=[AVG($5)], EXPR$6=[AVG($6)], EXPR$7=[AVG($7)]), rowType=[RecordType(TINYINT EXPR$0, SMALLINT EXPR$1, INTEGER EXPR$2, BIGINT EXPR$3, FLOAT EXPR$4, DOUBLE EXPR$5, DECIMAL(38, 20) EXPR$6, DECIMAL(38, 6) EXPR$7)]
 +- LogicalProject(byte=[$0], short=[$1], int=[$2], long=[$3], float=[$4], double=[$5], decimal3020=[$11], decimal105=[$12]), rowType=[RecordType(TINYINT byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(byte, short, int, long, float, double, boolean, string, date, time, timestamp, decimal3020, decimal105)]]]), rowType=[RecordType(TINYINT byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, TIMESTAMP(3) timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-HashAggregate(isMerge=[false], select=[AVG(byte) AS EXPR$0, AVG(short) AS EXPR$1, AVG(int) AS EXPR$2, AVG(long) AS EXPR$3, AVG(float) AS EXPR$4, AVG(double) AS EXPR$5, AVG(decimal3020) AS EXPR$6, AVG(decimal105) AS EXPR$7]), rowType=[RecordType(DOUBLE EXPR$0, DOUBLE EXPR$1, DOUBLE EXPR$2, DOUBLE EXPR$3, DOUBLE EXPR$4, DOUBLE EXPR$5, DECIMAL(38, 20) EXPR$6, DECIMAL(38, 6) EXPR$7)]
+HashAggregate(isMerge=[false], select=[AVG(byte) AS EXPR$0, AVG(short) AS EXPR$1, AVG(int) AS EXPR$2, AVG(long) AS EXPR$3, AVG(float) AS EXPR$4, AVG(double) AS EXPR$5, AVG(decimal3020) AS EXPR$6, AVG(decimal105) AS EXPR$7]), rowType=[RecordType(TINYINT EXPR$0, SMALLINT EXPR$1, INTEGER EXPR$2, BIGINT EXPR$3, FLOAT EXPR$4, DOUBLE EXPR$5, DECIMAL(38, 20) EXPR$6, DECIMAL(38, 6) EXPR$7)]
 +- Exchange(distribution=[single]), rowType=[RecordType(TINYINT byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
    +- Calc(select=[byte, short, int, long, float, double, decimal3020, decimal105]), rowType=[RecordType(TINYINT byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
       +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(byte, short, int, long, float, double, boolean, string, date, time, timestamp, decimal3020, decimal105)]]], fields=[byte, short, int, long, float, double, boolean, string, date, time, timestamp, decimal3020, decimal105]), rowType=[RecordType(TINYINT byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN boolean, VARCHAR(2147483647) string, DATE date, TIME(0)  [...]
@@ -398,14 +398,14 @@ FROM MyTable
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{}], EXPR$0=[AVG($0)], EXPR$1=[AVG($1)], EXPR$2=[AVG($2)], EXPR$3=[AVG($3)], EXPR$4=[AVG($4)], EXPR$5=[AVG($5)], EXPR$6=[AVG($6)], EXPR$7=[AVG($7)]), rowType=[RecordType(DOUBLE EXPR$0, DOUBLE EXPR$1, DOUBLE EXPR$2, DOUBLE EXPR$3, DOUBLE EXPR$4, DOUBLE EXPR$5, DECIMAL(38, 20) EXPR$6, DECIMAL(38, 6) EXPR$7)]
+LogicalAggregate(group=[{}], EXPR$0=[AVG($0)], EXPR$1=[AVG($1)], EXPR$2=[AVG($2)], EXPR$3=[AVG($3)], EXPR$4=[AVG($4)], EXPR$5=[AVG($5)], EXPR$6=[AVG($6)], EXPR$7=[AVG($7)]), rowType=[RecordType(TINYINT EXPR$0, SMALLINT EXPR$1, INTEGER EXPR$2, BIGINT EXPR$3, FLOAT EXPR$4, DOUBLE EXPR$5, DECIMAL(38, 20) EXPR$6, DECIMAL(38, 6) EXPR$7)]
 +- LogicalProject(byte=[$0], short=[$1], int=[$2], long=[$3], float=[$4], double=[$5], decimal3020=[$11], decimal105=[$12]), rowType=[RecordType(TINYINT byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(byte, short, int, long, float, double, boolean, string, date, time, timestamp, decimal3020, decimal105)]]]), rowType=[RecordType(TINYINT byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, TIMESTAMP(3) timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-HashAggregate(isMerge=[true], select=[Final_AVG(sum$0, count$1) AS EXPR$0, Final_AVG(sum$2, count$3) AS EXPR$1, Final_AVG(sum$4, count$5) AS EXPR$2, Final_AVG(sum$6, count$7) AS EXPR$3, Final_AVG(sum$8, count$9) AS EXPR$4, Final_AVG(sum$10, count$11) AS EXPR$5, Final_AVG(sum$12, count$13) AS EXPR$6, Final_AVG(sum$14, count$15) AS EXPR$7]), rowType=[RecordType(DOUBLE EXPR$0, DOUBLE EXPR$1, DOUBLE EXPR$2, DOUBLE EXPR$3, DOUBLE EXPR$4, DOUBLE EXPR$5, DECIMAL(38, 20) EXPR$6, DECIMAL(38, 6) EXPR$7)]
+HashAggregate(isMerge=[true], select=[Final_AVG(sum$0, count$1) AS EXPR$0, Final_AVG(sum$2, count$3) AS EXPR$1, Final_AVG(sum$4, count$5) AS EXPR$2, Final_AVG(sum$6, count$7) AS EXPR$3, Final_AVG(sum$8, count$9) AS EXPR$4, Final_AVG(sum$10, count$11) AS EXPR$5, Final_AVG(sum$12, count$13) AS EXPR$6, Final_AVG(sum$14, count$15) AS EXPR$7]), rowType=[RecordType(TINYINT EXPR$0, SMALLINT EXPR$1, INTEGER EXPR$2, BIGINT EXPR$3, FLOAT EXPR$4, DOUBLE EXPR$5, DECIMAL(38, 20) EXPR$6, DECIMAL(38, 6 [...]
 +- Exchange(distribution=[single]), rowType=[RecordType(BIGINT sum$0, BIGINT count$1, BIGINT sum$2, BIGINT count$3, BIGINT sum$4, BIGINT count$5, BIGINT sum$6, BIGINT count$7, DOUBLE sum$8, BIGINT count$9, DOUBLE sum$10, BIGINT count$11, DECIMAL(38, 20) sum$12, BIGINT count$13, DECIMAL(38, 5) sum$14, BIGINT count$15)]
    +- LocalHashAggregate(select=[Partial_AVG(byte) AS (sum$0, count$1), Partial_AVG(short) AS (sum$2, count$3), Partial_AVG(int) AS (sum$4, count$5), Partial_AVG(long) AS (sum$6, count$7), Partial_AVG(float) AS (sum$8, count$9), Partial_AVG(double) AS (sum$10, count$11), Partial_AVG(decimal3020) AS (sum$12, count$13), Partial_AVG(decimal105) AS (sum$14, count$15)]), rowType=[RecordType(BIGINT sum$0, BIGINT count$1, BIGINT sum$2, BIGINT count$3, BIGINT sum$4, BIGINT count$5, BIGINT sum$6, [...]
       +- Calc(select=[byte, short, int, long, float, double, decimal3020, decimal105]), rowType=[RecordType(TINYINT byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.xml
index 128e9d3..b185edb 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.xml
@@ -30,13 +30,13 @@ FROM MyTable
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), 0), $SUM0($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), null:INTEGER)], EXPR$1=[MAX($0) OVER (PARTITION BY $1 ORDER BY $2 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)], EXPR$2=[/(CAST(CASE(>(COUNT($0) OVER (PARTITION BY $2 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING [...]
+LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), 0), $SUM0($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), null:INTEGER)], EXPR$1=[MAX($0) OVER (PARTITION BY $1 ORDER BY $2 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)], EXPR$2=[CAST(/(CASE(>(COUNT($0) OVER (PARTITION BY $2 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING [...]
 +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[CASE(>(w0$o0, 0:BIGINT), w0$o1, null:INTEGER) AS EXPR$0, w1$o0 AS EXPR$1, /(CAST(CASE(>(w2$o0, 0:BIGINT), w2$o1, null:INTEGER)), w2$o0) AS EXPR$2, w0$o2 AS EXPR$3, w2$o2 AS EXPR$4])
+Calc(select=[CASE(>(w0$o0, 0:BIGINT), w0$o1, null:INTEGER) AS EXPR$0, w1$o0 AS EXPR$1, CAST(/(CASE(>(w2$o0, 0:BIGINT), w2$o1, null:INTEGER), w2$o0)) AS EXPR$2, w0$o2 AS EXPR$3, w2$o2 AS EXPR$4])
 +- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(a) AS w2$o0, $SUM0(a) AS w2$o1, MIN(a) AS w2$o2 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0, w0$o1, w0$o2, w1$o0, w2$o0, w2$o1, w2$o2])
    +- Sort(orderBy=[c ASC, a ASC])
       +- Exchange(distribution=[hash[c]])
@@ -63,13 +63,13 @@ FROM MyTable
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $2 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), 0), $SUM0($0) OVER (PARTITION BY $1 ORDER BY $2 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), null:INTEGER)], EXPR$1=[MAX($0) OVER (PARTITION BY $2 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)], EXPR$2=[MIN($0) OVER (ORDER BY $2 NULLS FIRST, $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW [...]
+LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $2 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), 0), $SUM0($0) OVER (PARTITION BY $1 ORDER BY $2 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), null:INTEGER)], EXPR$1=[MAX($0) OVER (PARTITION BY $2 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)], EXPR$2=[MIN($0) OVER (ORDER BY $2 NULLS FIRST, $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW [...]
 +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[CASE(>(w0$o0, 0:BIGINT), w0$o1, null:INTEGER) AS EXPR$0, w1$o0 AS EXPR$1, w2$o0 AS EXPR$2, w0$o2 AS EXPR$3, /(CAST(CASE(>(w3$o0, 0:BIGINT), w3$o1, null:INTEGER)), w3$o0) AS EXPR$4])
+Calc(select=[CASE(>(w0$o0, 0:BIGINT), w0$o1, null:INTEGER) AS EXPR$0, w1$o0 AS EXPR$1, w2$o0 AS EXPR$2, w0$o2 AS EXPR$3, CAST(/(CASE(>(w3$o0, 0:BIGINT), w3$o1, null:INTEGER), w3$o0)) AS EXPR$4])
 +- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[MAX(a) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o2, w1$o0, w0$o1, w3$o0, w3$o1, w2$o0, w0$o0])
    +- Sort(orderBy=[c ASC, a ASC])
       +- Exchange(distribution=[hash[c]])
@@ -431,13 +431,13 @@ FROM MyTable
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $2 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), 0), $SUM0($0) OVER (PARTITION BY $1 ORDER BY $2 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), null:INTEGER)], EXPR$1=[MAX($0) OVER (PARTITION BY $1 ORDER BY $1 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)], EXPR$2=[/(CAST(CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING [...]
+LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $2 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), 0), $SUM0($0) OVER (PARTITION BY $1 ORDER BY $2 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), null:INTEGER)], EXPR$1=[MAX($0) OVER (PARTITION BY $1 ORDER BY $1 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)], EXPR$2=[CAST(/(CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING [...]
 +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[CASE(>(w0$o0, 0:BIGINT), w0$o1, null:INTEGER) AS EXPR$0, w1$o0 AS EXPR$1, /(CAST(CASE(>(w2$o0, 0:BIGINT), w2$o1, null:INTEGER)), w2$o0) AS EXPR$2, w0$o2 AS EXPR$3, w1$o1 AS EXPR$4])
+Calc(select=[CASE(>(w0$o0, 0:BIGINT), w0$o1, null:INTEGER) AS EXPR$0, w1$o0 AS EXPR$1, CAST(/(CASE(>(w2$o0, 0:BIGINT), w2$o1, null:INTEGER), w2$o0)) AS EXPR$2, w0$o2 AS EXPR$3, w1$o1 AS EXPR$4])
 +- OverAggregate(partitionBy=[b], orderBy=[c ASC], window#0=[COUNT(a) AS w2$o0, $SUM0(a) AS w2$o1, RANK(*) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], window#1=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o1, w0$o2, w1$o1, w0$o0, w2$o0, w2$o1, w1$o0])
    +- Sort(orderBy=[b ASC, c ASC])
       +- OverAggregate(partitionBy=[b], orderBy=[b ASC], window#0=[MAX(a) AS w1$o1, MIN(a) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o1, w0$o2, w1$o1, w0$o0])
@@ -462,13 +462,13 @@ FROM MyTable
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $2 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), 0), $SUM0($0) OVER (PARTITION BY $1 ORDER BY $2 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), null:INTEGER)], EXPR$1=[MAX($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)], EXPR$2=[/(CAST(CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST, $2 NULLS FIRST RANGE BETWEEN UNB [...]
+LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $2 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), 0), $SUM0($0) OVER (PARTITION BY $1 ORDER BY $2 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), null:INTEGER)], EXPR$1=[MAX($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)], EXPR$2=[CAST(/(CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST, $2 NULLS FIRST RANGE BETWEEN UNB [...]
 +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[CASE(>(w0$o0, 0:BIGINT), w0$o1, null:INTEGER) AS EXPR$0, w1$o0 AS EXPR$1, /(CAST(CASE(>(w2$o0, 0:BIGINT), w2$o1, null:INTEGER)), w2$o0) AS EXPR$2, w3$o0 AS EXPR$3, w4$o0 AS EXPR$4])
+Calc(select=[CASE(>(w0$o0, 0:BIGINT), w0$o1, null:INTEGER) AS EXPR$0, w1$o0 AS EXPR$1, CAST(/(CASE(>(w2$o0, 0:BIGINT), w2$o1, null:INTEGER), w2$o0)) AS EXPR$2, w3$o0 AS EXPR$3, w4$o0 AS EXPR$4])
 +- OverAggregate(partitionBy=[b], orderBy=[c ASC], window#0=[COUNT(a) AS w1$o0, $SUM0(a) AS w3$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w2$o0, w0$o0, w2$o1, w4$o0, w0$o1, w1$o0, w3$o0])
    +- Sort(orderBy=[b ASC, c ASC])
       +- OverAggregate(partitionBy=[b], orderBy=[b ASC], window#0=[MIN(a) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w2$o0, w0$o0, w2$o1, w4$o0, w0$o1])
@@ -521,13 +521,13 @@ FROM MyTable
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), 0), $SUM0($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), null:INTEGER)], EXPR$1=[MAX($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)], EXPR$2=[/(CAST(CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $0 DESC NULLS LAST RANGE BETWEEN UNBOUNDED PRECE [...]
+LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), 0), $SUM0($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), null:INTEGER)], EXPR$1=[MAX($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)], EXPR$2=[CAST(/(CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $0 DESC NULLS LAST RANGE BETWEEN UNBOUNDED PRECE [...]
 +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[CASE(>(w0$o0, 0:BIGINT), w0$o1, null:INTEGER) AS EXPR$0, w0$o2 AS EXPR$1, /(CAST(CASE(>(w1$o0, 0:BIGINT), w1$o1, null:INTEGER)), w1$o0) AS EXPR$2, w0$o3 AS EXPR$3, w1$o2 AS EXPR$4])
+Calc(select=[CASE(>(w0$o0, 0:BIGINT), w0$o1, null:INTEGER) AS EXPR$0, w0$o2 AS EXPR$1, CAST(/(CASE(>(w1$o0, 0:BIGINT), w1$o1, null:INTEGER), w1$o0)) AS EXPR$2, w0$o3 AS EXPR$3, w1$o2 AS EXPR$4])
 +- OverAggregate(partitionBy=[b], orderBy=[a DESC], window#0=[COUNT(a) AS w1$o0, $SUM0(a) AS w1$o1, MIN(a) AS w1$o2 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, w0$o0, w0$o1, w0$o2, w0$o3, w1$o0, w1$o1, w1$o2])
    +- Sort(orderBy=[b ASC, a DESC])
       +- OverAggregate(partitionBy=[b], orderBy=[a ASC], window#0=[COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1, MAX(a) AS w0$o2, RANK(*) AS w0$o3 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], window#1=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, w0$o0, w0$o1, w0$o2, w0$o3])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml
index 8a1ba86..68afff7 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml
@@ -345,14 +345,14 @@ FROM MyTable
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{}], EXPR$0=[AVG($0)], EXPR$1=[AVG($1)], EXPR$2=[AVG($2)], EXPR$3=[AVG($3)], EXPR$4=[AVG($4)], EXPR$5=[AVG($5)], EXPR$6=[AVG($6)], EXPR$7=[AVG($7)]), rowType=[RecordType(DOUBLE EXPR$0, DOUBLE EXPR$1, DOUBLE EXPR$2, DOUBLE EXPR$3, DOUBLE EXPR$4, DOUBLE EXPR$5, DECIMAL(38, 20) EXPR$6, DECIMAL(38, 6) EXPR$7)]
+LogicalAggregate(group=[{}], EXPR$0=[AVG($0)], EXPR$1=[AVG($1)], EXPR$2=[AVG($2)], EXPR$3=[AVG($3)], EXPR$4=[AVG($4)], EXPR$5=[AVG($5)], EXPR$6=[AVG($6)], EXPR$7=[AVG($7)]), rowType=[RecordType(TINYINT EXPR$0, SMALLINT EXPR$1, INTEGER EXPR$2, BIGINT EXPR$3, FLOAT EXPR$4, DOUBLE EXPR$5, DECIMAL(38, 20) EXPR$6, DECIMAL(38, 6) EXPR$7)]
 +- LogicalProject(byte=[$0], short=[$1], int=[$2], long=[$3], float=[$4], double=[$5], decimal3020=[$11], decimal105=[$12]), rowType=[RecordType(TINYINT byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(byte, short, int, long, float, double, boolean, string, date, time, timestamp, decimal3020, decimal105)]]]), rowType=[RecordType(TINYINT byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, TIMESTAMP(3) timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-SortAggregate(isMerge=[true], select=[Final_AVG(sum$0, count$1) AS EXPR$0, Final_AVG(sum$2, count$3) AS EXPR$1, Final_AVG(sum$4, count$5) AS EXPR$2, Final_AVG(sum$6, count$7) AS EXPR$3, Final_AVG(sum$8, count$9) AS EXPR$4, Final_AVG(sum$10, count$11) AS EXPR$5, Final_AVG(sum$12, count$13) AS EXPR$6, Final_AVG(sum$14, count$15) AS EXPR$7]), rowType=[RecordType(DOUBLE EXPR$0, DOUBLE EXPR$1, DOUBLE EXPR$2, DOUBLE EXPR$3, DOUBLE EXPR$4, DOUBLE EXPR$5, DECIMAL(38, 20) EXPR$6, DECIMAL(38, 6) EXPR$7)]
+SortAggregate(isMerge=[true], select=[Final_AVG(sum$0, count$1) AS EXPR$0, Final_AVG(sum$2, count$3) AS EXPR$1, Final_AVG(sum$4, count$5) AS EXPR$2, Final_AVG(sum$6, count$7) AS EXPR$3, Final_AVG(sum$8, count$9) AS EXPR$4, Final_AVG(sum$10, count$11) AS EXPR$5, Final_AVG(sum$12, count$13) AS EXPR$6, Final_AVG(sum$14, count$15) AS EXPR$7]), rowType=[RecordType(TINYINT EXPR$0, SMALLINT EXPR$1, INTEGER EXPR$2, BIGINT EXPR$3, FLOAT EXPR$4, DOUBLE EXPR$5, DECIMAL(38, 20) EXPR$6, DECIMAL(38, 6 [...]
 +- Exchange(distribution=[single]), rowType=[RecordType(BIGINT sum$0, BIGINT count$1, BIGINT sum$2, BIGINT count$3, BIGINT sum$4, BIGINT count$5, BIGINT sum$6, BIGINT count$7, DOUBLE sum$8, BIGINT count$9, DOUBLE sum$10, BIGINT count$11, DECIMAL(38, 20) sum$12, BIGINT count$13, DECIMAL(38, 5) sum$14, BIGINT count$15)]
    +- LocalSortAggregate(select=[Partial_AVG(byte) AS (sum$0, count$1), Partial_AVG(short) AS (sum$2, count$3), Partial_AVG(int) AS (sum$4, count$5), Partial_AVG(long) AS (sum$6, count$7), Partial_AVG(float) AS (sum$8, count$9), Partial_AVG(double) AS (sum$10, count$11), Partial_AVG(decimal3020) AS (sum$12, count$13), Partial_AVG(decimal105) AS (sum$14, count$15)]), rowType=[RecordType(BIGINT sum$0, BIGINT count$1, BIGINT sum$2, BIGINT count$3, BIGINT sum$4, BIGINT count$5, BIGINT sum$6, [...]
       +- Calc(select=[byte, short, int, long, float, double, decimal3020, decimal105]), rowType=[RecordType(TINYINT byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
@@ -376,14 +376,14 @@ FROM MyTable
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{}], EXPR$0=[AVG($0)], EXPR$1=[AVG($1)], EXPR$2=[AVG($2)], EXPR$3=[AVG($3)], EXPR$4=[AVG($4)], EXPR$5=[AVG($5)], EXPR$6=[AVG($6)], EXPR$7=[AVG($7)]), rowType=[RecordType(DOUBLE EXPR$0, DOUBLE EXPR$1, DOUBLE EXPR$2, DOUBLE EXPR$3, DOUBLE EXPR$4, DOUBLE EXPR$5, DECIMAL(38, 20) EXPR$6, DECIMAL(38, 6) EXPR$7)]
+LogicalAggregate(group=[{}], EXPR$0=[AVG($0)], EXPR$1=[AVG($1)], EXPR$2=[AVG($2)], EXPR$3=[AVG($3)], EXPR$4=[AVG($4)], EXPR$5=[AVG($5)], EXPR$6=[AVG($6)], EXPR$7=[AVG($7)]), rowType=[RecordType(TINYINT EXPR$0, SMALLINT EXPR$1, INTEGER EXPR$2, BIGINT EXPR$3, FLOAT EXPR$4, DOUBLE EXPR$5, DECIMAL(38, 20) EXPR$6, DECIMAL(38, 6) EXPR$7)]
 +- LogicalProject(byte=[$0], short=[$1], int=[$2], long=[$3], float=[$4], double=[$5], decimal3020=[$11], decimal105=[$12]), rowType=[RecordType(TINYINT byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(byte, short, int, long, float, double, boolean, string, date, time, timestamp, decimal3020, decimal105)]]]), rowType=[RecordType(TINYINT byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, TIMESTAMP(3) timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-SortAggregate(isMerge=[false], select=[AVG(byte) AS EXPR$0, AVG(short) AS EXPR$1, AVG(int) AS EXPR$2, AVG(long) AS EXPR$3, AVG(float) AS EXPR$4, AVG(double) AS EXPR$5, AVG(decimal3020) AS EXPR$6, AVG(decimal105) AS EXPR$7]), rowType=[RecordType(DOUBLE EXPR$0, DOUBLE EXPR$1, DOUBLE EXPR$2, DOUBLE EXPR$3, DOUBLE EXPR$4, DOUBLE EXPR$5, DECIMAL(38, 20) EXPR$6, DECIMAL(38, 6) EXPR$7)]
+SortAggregate(isMerge=[false], select=[AVG(byte) AS EXPR$0, AVG(short) AS EXPR$1, AVG(int) AS EXPR$2, AVG(long) AS EXPR$3, AVG(float) AS EXPR$4, AVG(double) AS EXPR$5, AVG(decimal3020) AS EXPR$6, AVG(decimal105) AS EXPR$7]), rowType=[RecordType(TINYINT EXPR$0, SMALLINT EXPR$1, INTEGER EXPR$2, BIGINT EXPR$3, FLOAT EXPR$4, DOUBLE EXPR$5, DECIMAL(38, 20) EXPR$6, DECIMAL(38, 6) EXPR$7)]
 +- Exchange(distribution=[single]), rowType=[RecordType(TINYINT byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
    +- Calc(select=[byte, short, int, long, float, double, decimal3020, decimal105]), rowType=[RecordType(TINYINT byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
       +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(byte, short, int, long, float, double, boolean, string, date, time, timestamp, decimal3020, decimal105)]]], fields=[byte, short, int, long, float, double, boolean, string, date, time, timestamp, decimal3020, decimal105]), rowType=[RecordType(TINYINT byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN boolean, VARCHAR(2147483647) string, DATE date, TIME(0)  [...]
@@ -406,14 +406,14 @@ FROM MyTable
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{}], EXPR$0=[AVG($0)], EXPR$1=[AVG($1)], EXPR$2=[AVG($2)], EXPR$3=[AVG($3)], EXPR$4=[AVG($4)], EXPR$5=[AVG($5)], EXPR$6=[AVG($6)], EXPR$7=[AVG($7)]), rowType=[RecordType(DOUBLE EXPR$0, DOUBLE EXPR$1, DOUBLE EXPR$2, DOUBLE EXPR$3, DOUBLE EXPR$4, DOUBLE EXPR$5, DECIMAL(38, 20) EXPR$6, DECIMAL(38, 6) EXPR$7)]
+LogicalAggregate(group=[{}], EXPR$0=[AVG($0)], EXPR$1=[AVG($1)], EXPR$2=[AVG($2)], EXPR$3=[AVG($3)], EXPR$4=[AVG($4)], EXPR$5=[AVG($5)], EXPR$6=[AVG($6)], EXPR$7=[AVG($7)]), rowType=[RecordType(TINYINT EXPR$0, SMALLINT EXPR$1, INTEGER EXPR$2, BIGINT EXPR$3, FLOAT EXPR$4, DOUBLE EXPR$5, DECIMAL(38, 20) EXPR$6, DECIMAL(38, 6) EXPR$7)]
 +- LogicalProject(byte=[$0], short=[$1], int=[$2], long=[$3], float=[$4], double=[$5], decimal3020=[$11], decimal105=[$12]), rowType=[RecordType(TINYINT byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(byte, short, int, long, float, double, boolean, string, date, time, timestamp, decimal3020, decimal105)]]]), rowType=[RecordType(TINYINT byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, TIMESTAMP(3) timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-SortAggregate(isMerge=[true], select=[Final_AVG(sum$0, count$1) AS EXPR$0, Final_AVG(sum$2, count$3) AS EXPR$1, Final_AVG(sum$4, count$5) AS EXPR$2, Final_AVG(sum$6, count$7) AS EXPR$3, Final_AVG(sum$8, count$9) AS EXPR$4, Final_AVG(sum$10, count$11) AS EXPR$5, Final_AVG(sum$12, count$13) AS EXPR$6, Final_AVG(sum$14, count$15) AS EXPR$7]), rowType=[RecordType(DOUBLE EXPR$0, DOUBLE EXPR$1, DOUBLE EXPR$2, DOUBLE EXPR$3, DOUBLE EXPR$4, DOUBLE EXPR$5, DECIMAL(38, 20) EXPR$6, DECIMAL(38, 6) EXPR$7)]
+SortAggregate(isMerge=[true], select=[Final_AVG(sum$0, count$1) AS EXPR$0, Final_AVG(sum$2, count$3) AS EXPR$1, Final_AVG(sum$4, count$5) AS EXPR$2, Final_AVG(sum$6, count$7) AS EXPR$3, Final_AVG(sum$8, count$9) AS EXPR$4, Final_AVG(sum$10, count$11) AS EXPR$5, Final_AVG(sum$12, count$13) AS EXPR$6, Final_AVG(sum$14, count$15) AS EXPR$7]), rowType=[RecordType(TINYINT EXPR$0, SMALLINT EXPR$1, INTEGER EXPR$2, BIGINT EXPR$3, FLOAT EXPR$4, DOUBLE EXPR$5, DECIMAL(38, 20) EXPR$6, DECIMAL(38, 6 [...]
 +- Exchange(distribution=[single]), rowType=[RecordType(BIGINT sum$0, BIGINT count$1, BIGINT sum$2, BIGINT count$3, BIGINT sum$4, BIGINT count$5, BIGINT sum$6, BIGINT count$7, DOUBLE sum$8, BIGINT count$9, DOUBLE sum$10, BIGINT count$11, DECIMAL(38, 20) sum$12, BIGINT count$13, DECIMAL(38, 5) sum$14, BIGINT count$15)]
    +- LocalSortAggregate(select=[Partial_AVG(byte) AS (sum$0, count$1), Partial_AVG(short) AS (sum$2, count$3), Partial_AVG(int) AS (sum$4, count$5), Partial_AVG(long) AS (sum$6, count$7), Partial_AVG(float) AS (sum$8, count$9), Partial_AVG(double) AS (sum$10, count$11), Partial_AVG(decimal3020) AS (sum$12, count$13), Partial_AVG(decimal105) AS (sum$14, count$15)]), rowType=[RecordType(BIGINT sum$0, BIGINT count$1, BIGINT sum$2, BIGINT count$3, BIGINT sum$4, BIGINT count$5, BIGINT sum$6, [...]
       +- Calc(select=[byte, short, int, long, float, double, decimal3020, decimal105]), rowType=[RecordType(TINYINT byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.xml
index 6286349..d6d70c3 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.xml
@@ -39,11 +39,11 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4], EXPR$4=[TUMBL
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[/(-($f0, /(*(CAST($f1), CAST($f1)), $f2)), $f2) AS EXPR$0, /(-($f0, /(*(CAST($f1), CAST($f1)), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1))) AS EXPR$1, POWER(/(-($f0, /(*(CAST($f1), CAST($f1)), $f2)), $f2), 0.5:DECIMAL(2, 1)) AS EXPR$2, POWER(/(-($f0, /(*(CAST($f1), CAST($f1)), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1))), 0.5:DECIMAL(2, 1)) AS EXPR$3, w$start AS EXPR$4, w$end AS EXPR$5])
+Calc(select=[CAST(/(-($f0, /(*($f1, $f1), $f2)), $f2)) AS EXPR$0, CAST(/(-($f0, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1)))) AS EXPR$1, CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), $f2), 0.5:DECIMAL(2, 1))) AS EXPR$2, CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1))), 0.5:DECIMAL(2, 1))) AS EXPR$3, w$start AS EXPR$4, w$end AS EXPR$5])
 +- HashWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[Final_SUM(sum$0) AS $f0, Final_SUM(sum$1) AS $f1, Final_COUNT(count$2) AS $f2])
    +- Exchange(distribution=[single])
       +- LocalHashWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[Partial_SUM($f2) AS sum$0, Partial_SUM(b) AS sum$1, Partial_COUNT(b) AS count$2])
-         +- Calc(select=[ts, b, *(CAST(b), CAST(b)) AS $f2])
+         +- Calc(select=[ts, b, *(b, b) AS $f2])
             +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(ts, a, b, c)]]], fields=[ts, a, b, c])
 ]]>
     </Resource>
@@ -71,10 +71,10 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4], EXPR$4=[TUMBL
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[/(-($f0, /(*(CAST($f1), CAST($f1)), $f2)), $f2) AS EXPR$0, /(-($f0, /(*(CAST($f1), CAST($f1)), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1))) AS EXPR$1, POWER(/(-($f0, /(*(CAST($f1), CAST($f1)), $f2)), $f2), 0.5:DECIMAL(2, 1)) AS EXPR$2, POWER(/(-($f0, /(*(CAST($f1), CAST($f1)), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1))), 0.5:DECIMAL(2, 1)) AS EXPR$3, w$start AS EXPR$4, w$end AS EXPR$5])
+Calc(select=[CAST(/(-($f0, /(*($f1, $f1), $f2)), $f2)) AS EXPR$0, CAST(/(-($f0, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1)))) AS EXPR$1, CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), $f2), 0.5:DECIMAL(2, 1))) AS EXPR$2, CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1))), 0.5:DECIMAL(2, 1))) AS EXPR$3, w$start AS EXPR$4, w$end AS EXPR$5])
 +- HashWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[SUM($f2) AS $f0, SUM(b) AS $f1, COUNT(b) AS $f2])
    +- Exchange(distribution=[single])
-      +- Calc(select=[ts, b, *(CAST(b), CAST(b)) AS $f2])
+      +- Calc(select=[ts, b, *(b, b) AS $f2])
          +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(ts, a, b, c)]]], fields=[ts, a, b, c])
 ]]>
     </Resource>
@@ -102,11 +102,11 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4], EXPR$4=[TUMBL
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[/(-($f0, /(*(CAST($f1), CAST($f1)), $f2)), $f2) AS EXPR$0, /(-($f0, /(*(CAST($f1), CAST($f1)), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1))) AS EXPR$1, POWER(/(-($f0, /(*(CAST($f1), CAST($f1)), $f2)), $f2), 0.5:DECIMAL(2, 1)) AS EXPR$2, POWER(/(-($f0, /(*(CAST($f1), CAST($f1)), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1))), 0.5:DECIMAL(2, 1)) AS EXPR$3, w$start AS EXPR$4, w$end AS EXPR$5])
+Calc(select=[CAST(/(-($f0, /(*($f1, $f1), $f2)), $f2)) AS EXPR$0, CAST(/(-($f0, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1)))) AS EXPR$1, CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), $f2), 0.5:DECIMAL(2, 1))) AS EXPR$2, CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1))), 0.5:DECIMAL(2, 1))) AS EXPR$3, w$start AS EXPR$4, w$end AS EXPR$5])
 +- HashWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[Final_SUM(sum$0) AS $f0, Final_SUM(sum$1) AS $f1, Final_COUNT(count$2) AS $f2])
    +- Exchange(distribution=[single])
       +- LocalHashWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[Partial_SUM($f2) AS sum$0, Partial_SUM(b) AS sum$1, Partial_COUNT(b) AS count$2])
-         +- Calc(select=[ts, b, *(CAST(b), CAST(b)) AS $f2])
+         +- Calc(select=[ts, b, *(b, b) AS $f2])
             +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(ts, a, b, c)]]], fields=[ts, a, b, c])
 ]]>
     </Resource>
@@ -1436,7 +1436,7 @@ LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)])
         </Resource>
         <Resource name="planAfter">
             <![CDATA[
-Calc(select=[CAST(CASE(=($f1, 0), null:INTEGER, s)) AS s, CAST(/(CAST(CASE(=($f1, 0), null:INTEGER, s)), $f1)) AS a, w$start AS wStart])
+Calc(select=[CAST(CASE(=($f1, 0), null:INTEGER, s)) AS s, CAST(CAST(/(CASE(=($f1, 0), null:INTEGER, s), $f1))) AS a, w$start AS wStart])
 +- HashWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[Final_$SUM0(sum$0) AS s, Final_COUNT(count1$1) AS $f1])
    +- Exchange(distribution=[single])
       +- LocalHashWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[Partial_$SUM0($f1) AS sum$0, Partial_COUNT(*) AS count1$1])
@@ -1472,7 +1472,7 @@ LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)])
         </Resource>
         <Resource name="planAfter">
             <![CDATA[
-Calc(select=[CAST(CASE(=($f1, 0), null:INTEGER, s)) AS s, CAST(/(CAST(CASE(=($f1, 0), null:INTEGER, s)), $f1)) AS a, w$start AS wStart])
+Calc(select=[CAST(CASE(=($f1, 0), null:INTEGER, s)) AS s, CAST(CAST(/(CASE(=($f1, 0), null:INTEGER, s), $f1))) AS a, w$start AS wStart])
 +- HashWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[$SUM0($f1) AS s, COUNT(*) AS $f1])
    +- Exchange(distribution=[single])
       +- Calc(select=[b, CASE(=(a, 1), 1, 99) AS $f1])
@@ -1507,7 +1507,7 @@ LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)])
         </Resource>
         <Resource name="planAfter">
             <![CDATA[
-Calc(select=[CAST(CASE(=($f1, 0), null:INTEGER, s)) AS s, CAST(/(CAST(CASE(=($f1, 0), null:INTEGER, s)), $f1)) AS a, w$start AS wStart])
+Calc(select=[CAST(CASE(=($f1, 0), null:INTEGER, s)) AS s, CAST(CAST(/(CASE(=($f1, 0), null:INTEGER, s), $f1))) AS a, w$start AS wStart])
 +- HashWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[Final_$SUM0(sum$0) AS s, Final_COUNT(count1$1) AS $f1])
    +- Exchange(distribution=[single])
       +- LocalHashWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime], select=[Partial_$SUM0($f1) AS sum$0, Partial_COUNT(*) AS count1$1])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRuleTest.xml
index 1212910..d98cbd7 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRuleTest.xml
@@ -371,9 +371,9 @@ LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT($3)])
       <![CDATA[
 FlinkLogicalCalc(select=[a4, c4, s, EXPR$3])
 +- FlinkLogicalAggregate(group=[{0, 2}], c4=[AUXILIARY_GROUP($1)], EXPR$3=[COUNT($3)])
-   +- FlinkLogicalCalc(select=[a4, c4, w$start AS s, /(-($f2, /(*(CAST($f3), CAST($f3)), $f4)), $f4) AS b4])
+   +- FlinkLogicalCalc(select=[a4, c4, w$start AS s, CAST(/(-($f2, /(*($f3, $f3), $f4)), $f4)) AS b4])
       +- FlinkLogicalWindowAggregate(group=[{0}], c4=[AUXILIARY_GROUP($1)], agg#1=[SUM($4)], agg#2=[SUM($3)], agg#3=[COUNT($3)], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime])
-         +- FlinkLogicalCalc(select=[a4, c4, d4, b4, *(CAST(b4), CAST(b4)) AS $f4])
+         +- FlinkLogicalCalc(select=[a4, c4, d4, b4, *(b4, b4) AS $f4])
             +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4, b4, c4, d4])
 ]]>
     </Resource>
@@ -395,9 +395,9 @@ LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT($3)])
       <![CDATA[
 FlinkLogicalCalc(select=[a4, c4, e, EXPR$3])
 +- FlinkLogicalAggregate(group=[{0, 2}], c4=[AUXILIARY_GROUP($1)], EXPR$3=[COUNT($3)])
-   +- FlinkLogicalCalc(select=[a4, c4, w$end AS e, /(-($f2, /(*(CAST($f3), CAST($f3)), $f4)), $f4) AS b4])
+   +- FlinkLogicalCalc(select=[a4, c4, w$end AS e, CAST(/(-($f2, /(*($f3, $f3), $f4)), $f4)) AS b4])
       +- FlinkLogicalWindowAggregate(group=[{0}], c4=[AUXILIARY_GROUP($1)], agg#1=[SUM($4)], agg#2=[SUM($3)], agg#3=[COUNT($3)], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime])
-         +- FlinkLogicalCalc(select=[a4, c4, d4, b4, *(CAST(b4), CAST(b4)) AS $f4])
+         +- FlinkLogicalCalc(select=[a4, c4, d4, b4, *(b4, b4) AS $f4])
             +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4, b4, c4, d4])
 ]]>
     </Resource>
@@ -418,9 +418,9 @@ LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()])
     <Resource name="planAfter">
       <![CDATA[
 FlinkLogicalAggregate(group=[{0, 1}], c4=[AUXILIARY_GROUP($2)], EXPR$3=[COUNT()])
-+- FlinkLogicalCalc(select=[a4, /(-($f2, /(*(CAST($f3), CAST($f3)), $f4)), $f4) AS b4, c4])
++- FlinkLogicalCalc(select=[a4, CAST(/(-($f2, /(*($f3, $f3), $f4)), $f4)) AS b4, c4])
    +- FlinkLogicalWindowAggregate(group=[{0}], c4=[AUXILIARY_GROUP($1)], agg#1=[SUM($4)], agg#2=[SUM($3)], agg#3=[COUNT($3)], window=[TumblingGroupWindow], properties=[])
-      +- FlinkLogicalCalc(select=[a4, c4, d4, b4, *(CAST(b4), CAST(b4)) AS $f4])
+      +- FlinkLogicalCalc(select=[a4, c4, d4, b4, *(b4, b4) AS $f4])
          +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]], fields=[a4, b4, c4, d4])
 ]]>
     </Resource>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.xml
index b59a9c8..5c73519 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.xml
@@ -228,7 +228,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[SUM($2)], EXP
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-FlinkLogicalCalc(select=[a, EXPR$1, CASE(=($f3, 0), null:INTEGER, EXPR$2) AS EXPR$2, /(CAST(CASE(=($f3, 0), null:INTEGER, EXPR$2)), $f3) AS EXPR$3, EXPR$4, EXPR$5, $f3 AS EXPR$6, EXPR$7])
+FlinkLogicalCalc(select=[a, EXPR$1, CASE(=($f3, 0), null:INTEGER, EXPR$2) AS EXPR$2, CAST(/(CASE(=($f3, 0), null:INTEGER, EXPR$2), $f3)) AS EXPR$3, EXPR$4, EXPR$5, $f3 AS EXPR$6, EXPR$7])
 +- FlinkLogicalAggregate(group=[{0}], agg#0=[$SUM0($3)], agg#1=[$SUM0($4)], agg#2=[$SUM0($5)], agg#3=[MAX($6)], agg#4=[MIN($7)], agg#5=[$SUM0($8)])
    +- FlinkLogicalAggregate(group=[{0, 3, 4}], agg#0=[COUNT(DISTINCT $2) FILTER $5], agg#1=[$SUM0($1) FILTER $6], agg#2=[COUNT($1) FILTER $6], agg#3=[MAX($1) FILTER $7], agg#4=[MIN($1) FILTER $7], agg#5=[COUNT() FILTER $6])
       +- FlinkLogicalCalc(select=[a, b, c, $f3, $f4, =($e, 2) AS $g_2, =($e, 3) AS $g_3, =($e, 1) AS $g_1])
@@ -251,7 +251,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[SUM($1)], EXP
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-FlinkLogicalCalc(select=[a, $f1, $f2, CAST(/($f3, $f4)) AS $f3])
+FlinkLogicalCalc(select=[a, $f1, $f2, CAST(IF(=($f4, 0:BIGINT), null:INTEGER, /($f3, $f4))) AS $f3])
 +- FlinkLogicalAggregate(group=[{0}], agg#0=[$SUM0($2)], agg#1=[SUM($3)], agg#2=[$SUM0($4)], agg#3=[$SUM0($5)])
    +- FlinkLogicalAggregate(group=[{0, 2}], agg#0=[COUNT(DISTINCT $1) FILTER $3], agg#1=[SUM($1) FILTER $4], agg#2=[$SUM0($1) FILTER $4], agg#3=[COUNT($1) FILTER $4])
       +- FlinkLogicalCalc(select=[a, b, $f2, =($e, 0) AS $g_0, =($e, 1) AS $g_1])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/WindowGroupReorderRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/WindowGroupReorderRuleTest.xml
index cb2d436..e208fba 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/WindowGroupReorderRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/WindowGroupReorderRuleTest.xml
@@ -30,13 +30,13 @@ FROM MyTable
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), 0), $SUM0($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), null:INTEGER)], EXPR$1=[MAX($0) OVER (PARTITION BY $1 ORDER BY $2 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)], EXPR$2=[/(CAST(CASE(>(COUNT($0) OVER (PARTITION BY $2 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING [...]
+LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), 0), $SUM0($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), null:INTEGER)], EXPR$1=[MAX($0) OVER (PARTITION BY $1 ORDER BY $2 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)], EXPR$2=[CAST(/(CASE(>(COUNT($0) OVER (PARTITION BY $2 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING [...]
 +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-LogicalProject(EXPR$0=[CASE(>($3, 0), $4, null:INTEGER)], EXPR$1=[$6], EXPR$2=[/(CAST(CASE(>($7, 0), $8, null:INTEGER)):DOUBLE, $7)], EXPR$3=[$5], EXPR$4=[$9])
+LogicalProject(EXPR$0=[CASE(>($3, 0), $4, null:INTEGER)], EXPR$1=[$6], EXPR$2=[CAST(/(CASE(>($7, 0), $8, null:INTEGER), $7)):INTEGER], EXPR$3=[$5], EXPR$4=[$9])
 +- LogicalWindow(window#0=[window(partition {1} order by [0 ASC-nulls-first] range between UNBOUNDED PRECEDING and CURRENT ROW aggs [COUNT($0), $SUM0($0), RANK()])], window#1=[window(partition {1} order by [2 ASC-nulls-first] range between UNBOUNDED PRECEDING and CURRENT ROW aggs [MAX($0)])], window#2=[window(partition {2} order by [0 ASC-nulls-first] range between UNBOUNDED PRECEDING and CURRENT ROW aggs [COUNT($0), $SUM0($0), MIN($0)])])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
 ]]>
@@ -56,13 +56,13 @@ FROM MyTable
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $2 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), 0), $SUM0($0) OVER (PARTITION BY $1 ORDER BY $2 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), null:INTEGER)], EXPR$1=[MAX($0) OVER (PARTITION BY $2 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)], EXPR$2=[MIN($0) OVER (ORDER BY $2 NULLS FIRST, $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW [...]
+LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $2 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), 0), $SUM0($0) OVER (PARTITION BY $1 ORDER BY $2 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), null:INTEGER)], EXPR$1=[MAX($0) OVER (PARTITION BY $2 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)], EXPR$2=[MIN($0) OVER (ORDER BY $2 NULLS FIRST, $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW [...]
 +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-LogicalProject(EXPR$0=[CASE(>($3, 0), $4, null:INTEGER)], EXPR$1=[$6], EXPR$2=[$7], EXPR$3=[$5], EXPR$4=[/(CAST(CASE(>($8, 0), $9, null:INTEGER)):DOUBLE, $8)])
+LogicalProject(EXPR$0=[CASE(>($3, 0), $4, null:INTEGER)], EXPR$1=[$6], EXPR$2=[$7], EXPR$3=[$5], EXPR$4=[CAST(/(CASE(>($8, 0), $9, null:INTEGER), $8)):INTEGER])
 +- LogicalProject(a=[$0], b=[$1], c=[$2], w0$o0=[$6], w0$o1=[$7], w0$o2=[$8], w1$o0=[$9], w2$o0=[$5], w3$o0=[$3], w3$o1=[$4])
    +- LogicalWindow(window#0=[window(partition {} order by [1 ASC-nulls-first] range between UNBOUNDED PRECEDING and CURRENT ROW aggs [COUNT($0), $SUM0($0)])], window#1=[window(partition {} order by [2 ASC-nulls-first, 0 ASC-nulls-first] range between UNBOUNDED PRECEDING and CURRENT ROW aggs [MIN($0)])], window#2=[window(partition {1} order by [2 ASC-nulls-first] range between UNBOUNDED PRECEDING and CURRENT ROW aggs [COUNT($0), $SUM0($0), RANK()])], window#3=[window(partition {2} order  [...]
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
@@ -83,13 +83,13 @@ FROM MyTable
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), 0), $SUM0($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), null:INTEGER)], EXPR$1=[MAX($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)], EXPR$2=[/(CAST(CASE(>(COUNT($0) OVER (PARTITION BY $2 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING [...]
+LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), 0), $SUM0($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), null:INTEGER)], EXPR$1=[MAX($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)], EXPR$2=[CAST(/(CASE(>(COUNT($0) OVER (PARTITION BY $2 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING [...]
 +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-LogicalProject(EXPR$0=[CASE(>($3, 0), $4, null:INTEGER)], EXPR$1=[$5], EXPR$2=[/(CAST(CASE(>($7, 0), $8, null:INTEGER)):DOUBLE, $7)], EXPR$3=[$6], EXPR$4=[$9])
+LogicalProject(EXPR$0=[CASE(>($3, 0), $4, null:INTEGER)], EXPR$1=[$5], EXPR$2=[CAST(/(CASE(>($7, 0), $8, null:INTEGER), $7)):INTEGER], EXPR$3=[$6], EXPR$4=[$9])
 +- LogicalWindow(window#0=[window(partition {1} order by [0 ASC-nulls-first] range between UNBOUNDED PRECEDING and CURRENT ROW aggs [COUNT($0), $SUM0($0), MAX($0), RANK()])], window#1=[window(partition {2} order by [0 ASC-nulls-first] range between UNBOUNDED PRECEDING and CURRENT ROW aggs [COUNT($0), $SUM0($0), MIN($0)])])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
 ]]>
@@ -137,13 +137,13 @@ FROM MyTable
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $2 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), 0), $SUM0($0) OVER (PARTITION BY $1 ORDER BY $2 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), null:INTEGER)], EXPR$1=[MAX($0) OVER (PARTITION BY $1 ORDER BY $1 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)], EXPR$2=[/(CAST(CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING [...]
+LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $2 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), 0), $SUM0($0) OVER (PARTITION BY $1 ORDER BY $2 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), null:INTEGER)], EXPR$1=[MAX($0) OVER (PARTITION BY $1 ORDER BY $1 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)], EXPR$2=[CAST(/(CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING [...]
 +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-LogicalProject(EXPR$0=[CASE(>($3, 0), $4, null:INTEGER)], EXPR$1=[$6], EXPR$2=[/(CAST(CASE(>($8, 0), $9, null:INTEGER)):DOUBLE, $8)], EXPR$3=[$5], EXPR$4=[$7])
+LogicalProject(EXPR$0=[CASE(>($3, 0), $4, null:INTEGER)], EXPR$1=[$6], EXPR$2=[CAST(/(CASE(>($8, 0), $9, null:INTEGER), $8)):INTEGER], EXPR$3=[$5], EXPR$4=[$7])
 +- LogicalProject(a=[$0], b=[$1], c=[$2], w0$o0=[$7], w0$o1=[$8], w0$o2=[$9], w1$o0=[$5], w1$o1=[$6], w2$o0=[$3], w2$o1=[$4])
    +- LogicalWindow(window#0=[window(partition {1} order by [0 ASC-nulls-first] range between UNBOUNDED PRECEDING and CURRENT ROW aggs [COUNT($0), $SUM0($0)])], window#1=[window(partition {1} order by [1 ASC-nulls-first] range between UNBOUNDED PRECEDING and CURRENT ROW aggs [MAX($0), MIN($0)])], window#2=[window(partition {1} order by [2 ASC-nulls-first] range between UNBOUNDED PRECEDING and CURRENT ROW aggs [COUNT($0), $SUM0($0), RANK()])])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
@@ -164,13 +164,13 @@ FROM MyTable
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $2 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), 0), $SUM0($0) OVER (PARTITION BY $1 ORDER BY $2 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), null:INTEGER)], EXPR$1=[MAX($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)], EXPR$2=[/(CAST(CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST, $2 NULLS FIRST RANGE BETWEEN UNB [...]
+LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $2 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), 0), $SUM0($0) OVER (PARTITION BY $1 ORDER BY $2 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), null:INTEGER)], EXPR$1=[MAX($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)], EXPR$2=[CAST(/(CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST, $2 NULLS FIRST RANGE BETWEEN UNB [...]
 +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-LogicalProject(EXPR$0=[CASE(>($3, 0), $4, null:INTEGER)], EXPR$1=[$5], EXPR$2=[/(CAST(CASE(>($6, 0), $7, null:INTEGER)):DOUBLE, $6)], EXPR$3=[$8], EXPR$4=[$9])
+LogicalProject(EXPR$0=[CASE(>($3, 0), $4, null:INTEGER)], EXPR$1=[$5], EXPR$2=[CAST(/(CASE(>($6, 0), $7, null:INTEGER), $6)):INTEGER], EXPR$3=[$8], EXPR$4=[$9])
 +- LogicalProject(a=[$0], b=[$1], c=[$2], w0$o0=[$8], w0$o1=[$9], w1$o0=[$3], w2$o0=[$5], w2$o1=[$6], w3$o0=[$4], w4$o0=[$7])
    +- LogicalWindow(window#0=[window(partition {1} order by [0 ASC-nulls-first] range between UNBOUNDED PRECEDING and CURRENT ROW aggs [MAX($0)])], window#1=[window(partition {1} order by [0 ASC-nulls-first, 1 ASC-nulls-first] range between UNBOUNDED PRECEDING and CURRENT ROW aggs [RANK()])], window#2=[window(partition {1} order by [0 ASC-nulls-first, 2 ASC-nulls-first] range between UNBOUNDED PRECEDING and CURRENT ROW aggs [COUNT($0), $SUM0($0)])], window#3=[window(partition {1} order b [...]
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
@@ -191,13 +191,13 @@ FROM MyTable
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), 0), $SUM0($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), null:INTEGER)], EXPR$1=[MAX($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)], EXPR$2=[/(CAST(CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $0 DESC NULLS LAST RANGE BETWEEN UNBOUNDED PRECE [...]
+LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), 0), $SUM0($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), null:INTEGER)], EXPR$1=[MAX($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)], EXPR$2=[CAST(/(CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $0 DESC NULLS LAST RANGE BETWEEN UNBOUNDED PRECE [...]
 +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-LogicalProject(EXPR$0=[CASE(>($2, 0), $3, null:INTEGER)], EXPR$1=[$4], EXPR$2=[/(CAST(CASE(>($6, 0), $7, null:INTEGER)):DOUBLE, $6)], EXPR$3=[$5], EXPR$4=[$8])
+LogicalProject(EXPR$0=[CASE(>($2, 0), $3, null:INTEGER)], EXPR$1=[$4], EXPR$2=[CAST(/(CASE(>($6, 0), $7, null:INTEGER), $6)):INTEGER], EXPR$3=[$5], EXPR$4=[$8])
 +- LogicalWindow(window#0=[window(partition {1} order by [0 ASC-nulls-first] range between UNBOUNDED PRECEDING and CURRENT ROW aggs [COUNT($0), $SUM0($0), MAX($0), RANK()])], window#1=[window(partition {1} order by [0 DESC-nulls-last] range between UNBOUNDED PRECEDING and CURRENT ROW aggs [COUNT($0), $SUM0($0), MIN($0)])])
    +- LogicalProject(a=[$0], b=[$1])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/subquery/SubQuerySemiJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/subquery/SubQuerySemiJoinTest.xml
index d2bd212..cfae4e1 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/subquery/SubQuerySemiJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/subquery/SubQuerySemiJoinTest.xml
@@ -1794,15 +1794,13 @@ LogicalAggregate(group=[{}], EXPR$0=[AVG($0)])
     <Resource name="planAfter">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2])
-+- LogicalProject(a=[$0], b=[$1], c=[$2])
-   +- LogicalJoin(condition=[AND(=($3, $4), =($2, $5))], joinType=[semi])
-      :- LogicalProject(a=[$0], b=[$1], c=[$2], b0=[CAST($1):DOUBLE])
-      :  +- LogicalTableScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]])
-      +- LogicalProject(EXPR$0=[$1], f=[$0])
-         +- LogicalAggregate(group=[{0}], EXPR$0=[AVG($1)])
-            +- LogicalProject(f=[$2], e=[$1])
-               +- LogicalFilter(condition=[true])
-                  +- LogicalTableScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]])
++- LogicalJoin(condition=[AND(=($1, $3), =($2, $4))], joinType=[semi])
+   :- LogicalTableScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject(EXPR$0=[$1], f=[$0])
+      +- LogicalAggregate(group=[{0}], EXPR$0=[AVG($1)])
+         +- LogicalProject(f=[$2], e=[$1])
+            +- LogicalFilter(condition=[true])
+               +- LogicalTableScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
index 70c9d45..05143e1 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
@@ -176,14 +176,14 @@ FROM MyTable1
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{}], EXPR$0=[AVG($0)], EXPR$1=[AVG($1)], EXPR$2=[AVG($2)], EXPR$3=[AVG($3)], EXPR$4=[AVG($4)], EXPR$5=[AVG($5)], EXPR$6=[AVG($6)], EXPR$7=[AVG($7)]), rowType=[RecordType(DOUBLE EXPR$0, DOUBLE EXPR$1, DOUBLE EXPR$2, DOUBLE EXPR$3, DOUBLE EXPR$4, DOUBLE EXPR$5, DECIMAL(38, 20) EXPR$6, DECIMAL(38, 6) EXPR$7)]
+LogicalAggregate(group=[{}], EXPR$0=[AVG($0)], EXPR$1=[AVG($1)], EXPR$2=[AVG($2)], EXPR$3=[AVG($3)], EXPR$4=[AVG($4)], EXPR$5=[AVG($5)], EXPR$6=[AVG($6)], EXPR$7=[AVG($7)]), rowType=[RecordType(TINYINT EXPR$0, SMALLINT EXPR$1, INTEGER EXPR$2, BIGINT EXPR$3, FLOAT EXPR$4, DOUBLE EXPR$5, DECIMAL(38, 20) EXPR$6, DECIMAL(38, 6) EXPR$7)]
 +- LogicalProject(byte=[$0], short=[$1], int=[$2], long=[$3], float=[$4], double=[$5], decimal3020=[$11], decimal105=[$12]), rowType=[RecordType(TINYINT byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(byte, short, int, long, float, double, boolean, string, date, time, timestamp, decimal3020, decimal105)]]]), rowType=[RecordType(TINYINT byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, TIMESTAMP(3) timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-GroupAggregate(select=[AVG(byte) AS EXPR$0, AVG(short) AS EXPR$1, AVG(int) AS EXPR$2, AVG(long) AS EXPR$3, AVG(float) AS EXPR$4, AVG(double) AS EXPR$5, AVG(decimal3020) AS EXPR$6, AVG(decimal105) AS EXPR$7]), rowType=[RecordType(DOUBLE EXPR$0, DOUBLE EXPR$1, DOUBLE EXPR$2, DOUBLE EXPR$3, DOUBLE EXPR$4, DOUBLE EXPR$5, DECIMAL(38, 20) EXPR$6, DECIMAL(38, 6) EXPR$7)]
+GroupAggregate(select=[AVG(byte) AS EXPR$0, AVG(short) AS EXPR$1, AVG(int) AS EXPR$2, AVG(long) AS EXPR$3, AVG(float) AS EXPR$4, AVG(double) AS EXPR$5, AVG(decimal3020) AS EXPR$6, AVG(decimal105) AS EXPR$7]), rowType=[RecordType(TINYINT EXPR$0, SMALLINT EXPR$1, INTEGER EXPR$2, BIGINT EXPR$3, FLOAT EXPR$4, DOUBLE EXPR$5, DECIMAL(38, 20) EXPR$6, DECIMAL(38, 6) EXPR$7)]
 +- Exchange(distribution=[single]), rowType=[RecordType(TINYINT byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
    +- Calc(select=[byte, short, int, long, float, double, decimal3020, decimal105]), rowType=[RecordType(TINYINT byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
       +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(byte, short, int, long, float, double, boolean, string, date, time, timestamp, decimal3020, decimal105)]]], fields=[byte, short, int, long, float, double, boolean, string, date, time, timestamp, decimal3020, decimal105]), rowType=[RecordType(TINYINT byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN boolean, VARCHAR(2147483647) string, DATE date, TIME(0) [...]
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
index dc74eb4..d2b8388 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
@@ -876,7 +876,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[SUM($2)], EXP
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, EXPR$1, CASE(=($f3, 0), null:BIGINT, EXPR$2) AS EXPR$2, /(CAST(CASE(=($f3, 0), null:BIGINT, EXPR$2)), $f3) AS EXPR$3, EXPR$4, EXPR$5, $f3 AS EXPR$6, EXPR$7])
+Calc(select=[a, EXPR$1, CASE(=($f3, 0), null:BIGINT, EXPR$2) AS EXPR$2, /(CASE(=($f3, 0), null:BIGINT, EXPR$2), $f3) AS EXPR$3, EXPR$4, EXPR$5, $f3 AS EXPR$6, EXPR$7])
 +- GroupAggregate(groupBy=[a], select=[a, COUNT(DISTINCT c) AS EXPR$1, $SUM0(b) AS EXPR$2, COUNT(b) AS $f3, MAX(b) AS EXPR$4, MIN(b) AS EXPR$5, COUNT(*) AS EXPR$7])
    +- Exchange(distribution=[hash[a]])
       +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
@@ -901,7 +901,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[SUM($2)], EXP
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, EXPR$1, CASE(=($f3, 0), null:BIGINT, EXPR$2) AS EXPR$2, /(CAST(CASE(=($f3, 0), null:BIGINT, EXPR$2)), $f3) AS EXPR$3, EXPR$4, EXPR$5, $f3 AS EXPR$6, EXPR$7])
+Calc(select=[a, EXPR$1, CASE(=($f3, 0), null:BIGINT, EXPR$2) AS EXPR$2, /(CASE(=($f3, 0), null:BIGINT, EXPR$2), $f3) AS EXPR$3, EXPR$4, EXPR$5, $f3 AS EXPR$6, EXPR$7])
 +- GlobalGroupAggregate(groupBy=[a], select=[a, COUNT(distinct$0 count$0) AS EXPR$1, $SUM0(sum$1) AS EXPR$2, COUNT(count$2) AS $f3, MAX(max$3) AS EXPR$4, MIN(min$4) AS EXPR$5, COUNT(count1$5) AS EXPR$7])
    +- Exchange(distribution=[hash[a]])
       +- LocalGroupAggregate(groupBy=[a], select=[a, COUNT(distinct$0 c) AS count$0, $SUM0(b) AS sum$1, COUNT(b) AS count$2, MAX(b) AS max$3, MIN(b) AS min$4, COUNT(*) AS count1$5, DISTINCT(c) AS distinct$0])
@@ -927,7 +927,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[SUM($2)], EXP
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, EXPR$1, CASE(=($f3, 0), null:BIGINT, EXPR$2) AS EXPR$2, /(CAST(CASE(=($f3, 0), null:BIGINT, EXPR$2)), $f3) AS EXPR$3, EXPR$4, EXPR$5, $f3 AS EXPR$6, EXPR$7])
+Calc(select=[a, EXPR$1, CASE(=($f3, 0), null:BIGINT, EXPR$2) AS EXPR$2, /(CASE(=($f3, 0), null:BIGINT, EXPR$2), $f3) AS EXPR$3, EXPR$4, EXPR$5, $f3 AS EXPR$6, EXPR$7])
 +- GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($f3_0) AS $f1, $SUM0_RETRACT($f4_0) AS $f2, $SUM0_RETRACT($f5) AS $f3, MAX($f6) AS $f4, MIN($f7) AS $f5, $SUM0_RETRACT($f8) AS $f6])
    +- Exchange(distribution=[hash[a]])
       +- GroupAggregate(groupBy=[a, $f3, $f4], partialFinalType=[PARTIAL], select=[a, $f3, $f4, COUNT(DISTINCT c) FILTER $g_2 AS $f3_0, $SUM0(b) FILTER $g_3 AS $f4_0, COUNT(b) FILTER $g_3 AS $f5, MAX(b) FILTER $g_1 AS $f6, MIN(b) FILTER $g_1 AS $f7, COUNT(*) FILTER $g_3 AS $f8])
@@ -957,7 +957,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[SUM($2)], EXP
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, EXPR$1, CASE(=($f3, 0), null:BIGINT, EXPR$2) AS EXPR$2, /(CAST(CASE(=($f3, 0), null:BIGINT, EXPR$2)), $f3) AS EXPR$3, EXPR$4, EXPR$5, $f3 AS EXPR$6, EXPR$7])
+Calc(select=[a, EXPR$1, CASE(=($f3, 0), null:BIGINT, EXPR$2) AS EXPR$2, /(CASE(=($f3, 0), null:BIGINT, EXPR$2), $f3) AS EXPR$3, EXPR$4, EXPR$5, $f3 AS EXPR$6, EXPR$7])
 +- GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT(sum$0) AS $f1, $SUM0_RETRACT(sum$1) AS $f2, $SUM0_RETRACT(sum$2) AS $f3, MAX(max$3) AS $f4, MIN(min$4) AS $f5, $SUM0_RETRACT(sum$5) AS $f6])
    +- Exchange(distribution=[hash[a]])
       +- LocalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($f3_0) AS sum$0, $SUM0_RETRACT($f4_0) AS sum$1, $SUM0_RETRACT($f5) AS sum$2, MAX($f6) AS max$3, MIN($f7) AS min$4, $SUM0_RETRACT($f8) AS sum$5, COUNT_RETRACT(*) AS count1$6])
@@ -1028,7 +1028,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[SUM($1)], EXP
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, $f1, $f2, CAST(/($f3, $f4)) AS $f3])
+Calc(select=[a, $f1, $f2, IF(=($f4, 0:BIGINT), null:BIGINT, /($f3, $f4)) AS $f3])
 +- GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($f2_0) AS $f1, SUM_RETRACT($f3) AS $f2, $SUM0_RETRACT($f4) AS $f3, $SUM0_RETRACT($f5) AS $f4])
    +- Exchange(distribution=[hash[a]])
       +- GroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(DISTINCT b) FILTER $g_0 AS $f2_0, SUM(b) FILTER $g_1 AS $f3, $SUM0(b) FILTER $g_1 AS $f4, COUNT(b) FILTER $g_1 AS $f5])
@@ -1054,7 +1054,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[SUM($1)], EXP
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, $f1, $f2, CAST(/($f3, $f4)) AS $f3])
+Calc(select=[a, $f1, $f2, IF(=($f4, 0:BIGINT), null:BIGINT, /($f3, $f4)) AS $f3])
 +- GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT(sum$0) AS $f1, SUM_RETRACT((sum$1, count$2)) AS $f2, $SUM0_RETRACT(sum$3) AS $f3, $SUM0_RETRACT(sum$4) AS $f4])
    +- Exchange(distribution=[hash[a]])
       +- LocalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($f2_0) AS sum$0, SUM_RETRACT($f3) AS (sum$1, count$2), $SUM0_RETRACT($f4) AS sum$3, $SUM0_RETRACT($f5) AS sum$4, COUNT_RETRACT(*) AS count1$5])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
index ccef832..01dcc34 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
@@ -261,7 +261,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[SUM($2)], EXP
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, EXPR$1, CASE(=($f3, 0), null:BIGINT, EXPR$2) AS EXPR$2, /(CAST(CASE(=($f3, 0), null:BIGINT, EXPR$2)), $f3) AS EXPR$3, EXPR$4, EXPR$5, $f3 AS EXPR$6, EXPR$7])
+Calc(select=[a, EXPR$1, CASE(=($f3, 0), null:BIGINT, EXPR$2) AS EXPR$2, /(CASE(=($f3, 0), null:BIGINT, EXPR$2), $f3) AS EXPR$3, EXPR$4, EXPR$5, $f3 AS EXPR$6, EXPR$7])
 +- GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0(count$0) AS $f1, $SUM0(sum$1) AS $f2, $SUM0(count$2) AS $f3, MAX(max$3) AS $f4, MIN(min$4) AS $f5, $SUM0(count1$5) AS $f6])
    +- Exchange(distribution=[hash[a]])
       +- IncrementalGroupAggregate(partialAggGrouping=[a, $f3, $f4], finalAggGrouping=[a], select=[a, COUNT(distinct$0 count$0) AS count$0, $SUM0(sum$1) AS sum$1, COUNT(count$2) AS count$2, MAX(max$3) AS max$3, MIN(min$4) AS min$4, COUNT(count1$5) AS count1$5])
@@ -288,7 +288,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[SUM($1)], EXP
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, $f1, $f2, CAST(/($f3, $f4)) AS $f3])
+Calc(select=[a, $f1, $f2, IF(=($f4, 0:BIGINT), null:BIGINT, /($f3, $f4)) AS $f3])
 +- GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0(count$0) AS $f1, SUM(sum$1) AS $f2, $SUM0(sum$2) AS $f3, $SUM0(count$3) AS $f4])
    +- Exchange(distribution=[hash[a]])
       +- IncrementalGroupAggregate(partialAggGrouping=[a, $f2], finalAggGrouping=[a], select=[a, COUNT(distinct$0 count$0) AS count$0, SUM(sum$1) AS sum$1, $SUM0(sum$2) AS sum$2, COUNT(count$3) AS count$3])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml
index 589ad6c..5fe752c 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml
@@ -134,13 +134,13 @@ FROM MyTable
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(a=[$0], avgA=[/(CAST(CASE(>(COUNT($2) OVER (PARTITION BY $0 ORDER BY $3 NULLS FIRST RANGE BETWEEN 7200000:INTERVAL HOUR PRECEDING AND CURRENT ROW), 0), $SUM0($2) OVER (PARTITION BY $0 ORDER BY $3 NULLS FIRST RANGE BETWEEN 7200000:INTERVAL HOUR PRECEDING AND CURRENT ROW), null:BIGINT)):DOUBLE, COUNT($2) OVER (PARTITION BY $0 ORDER BY $3 NULLS FIRST RANGE BETWEEN 7200000:INTERVAL HOUR PRECEDING AND CURRENT ROW))])
+LogicalProject(a=[$0], avgA=[/(CASE(>(COUNT($2) OVER (PARTITION BY $0 ORDER BY $3 NULLS FIRST RANGE BETWEEN 7200000:INTERVAL HOUR PRECEDING AND CURRENT ROW), 0), $SUM0($2) OVER (PARTITION BY $0 ORDER BY $3 NULLS FIRST RANGE BETWEEN 7200000:INTERVAL HOUR PRECEDING AND CURRENT ROW), null:BIGINT), COUNT($2) OVER (PARTITION BY $0 ORDER BY $3 NULLS FIRST RANGE BETWEEN 7200000:INTERVAL HOUR PRECEDING AND CURRENT ROW))])
 +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, /(CAST(CASE(>(w0$o0, 0:BIGINT), w0$o1, null:BIGINT)), w0$o0) AS avgA])
+Calc(select=[a, /(CASE(>(w0$o0, 0:BIGINT), w0$o1, null:BIGINT), w0$o0) AS avgA])
 +- OverAggregate(partitionBy=[a], orderBy=[proctime ASC], window=[ RANG BETWEEN 7200000 PRECEDING AND CURRENT ROW], select=[a, c, proctime, COUNT(c) AS w0$o0, $SUM0(c) AS w0$o1])
    +- Exchange(distribution=[hash[a]])
       +- Calc(select=[a, c, proctime])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
index 9f10574e..c88f676 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
@@ -40,10 +40,10 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4], EXPR$4=[TUMBL
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[/(-($f0, /(*(CAST($f1), CAST($f1)), $f2)), $f2) AS EXPR$0, /(-($f0, /(*(CAST($f1), CAST($f1)), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1))) AS EXPR$1, POWER(/(-($f0, /(*(CAST($f1), CAST($f1)), $f2)), $f2), 0.5:DECIMAL(2, 1)) AS EXPR$2, POWER(/(-($f0, /(*(CAST($f1), CAST($f1)), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1))), 0.5:DECIMAL(2, 1)) AS EXPR$3, w$start AS EXPR$4, w$end AS EXPR$5])
+Calc(select=[/(-($f0, /(*($f1, $f1), $f2)), $f2) AS EXPR$0, /(-($f0, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1))) AS EXPR$1, CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), $f2), 0.5:DECIMAL(2, 1))) AS EXPR$2, CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1))), 0.5:DECIMAL(2, 1))) AS EXPR$3, w$start AS EXPR$4, w$end AS EXPR$5])
 +- GroupWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[SUM($f2) AS $f0, SUM(c) AS $f1, COUNT(c) AS $f2, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
    +- Exchange(distribution=[single])
-      +- Calc(select=[rowtime, c, *(CAST(c), CAST(c)) AS $f2])
+      +- Calc(select=[rowtime, c, *(c, c) AS $f2])
          +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
@@ -494,7 +494,7 @@ LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)])
         </Resource>
         <Resource name="planAfter">
             <![CDATA[
-Calc(select=[CAST(CASE(=($f1, 0), null:INTEGER, s)) AS s, CAST(/(CAST(CASE(=($f1, 0), null:INTEGER, s)), $f1)) AS a, w$start AS wStart])
+Calc(select=[CAST(CASE(=($f1, 0), null:INTEGER, s)) AS s, CAST(CAST(/(CASE(=($f1, 0), null:INTEGER, s), $f1))) AS a, w$start AS wStart])
 +- GroupWindowAggregate(window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[$SUM0($f1) AS s, COUNT(*) AS $f1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
    +- Exchange(distribution=[single])
       +- Calc(select=[rowtime, CASE(=(a, 1), 1, 99) AS $f1])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/GroupWindowTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/GroupWindowTest.xml
index e081653..ae8ceea 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/GroupWindowTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/GroupWindowTest.xml
@@ -170,10 +170,10 @@ LogicalProject(EXPR$0=[$0], EXPR$1=[$1], EXPR$2=[$2], EXPR$3=[$3], EXPR$4=[$4],
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[/(-($f0, /(*(CAST($f1), CAST($f1)), $f2)), $f2) AS EXPR$0, /(-($f0, /(*(CAST($f1), CAST($f1)), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1))) AS EXPR$1, POWER(/(-($f0, /(*(CAST($f1), CAST($f1)), $f2)), $f2), 0.5:DECIMAL(2, 1)) AS EXPR$2, POWER(/(-($f0, /(*(CAST($f1), CAST($f1)), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1))), 0.5:DECIMAL(2, 1)) AS EXPR$3, EXPR$4, EXPR$5])
+Calc(select=[/(-($f0, /(*($f1, $f1), $f2)), $f2) AS EXPR$0, /(-($f0, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1))) AS EXPR$1, CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), $f2), 0.5:DECIMAL(2, 1))) AS EXPR$2, CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null:BIGINT, -($f2, 1))), 0.5:DECIMAL(2, 1))) AS EXPR$3, EXPR$4, EXPR$5])
 +- GroupWindowAggregate(window=[TumblingGroupWindow], properties=[EXPR$4, EXPR$5], select=[SUM($f4) AS $f0, SUM(c) AS $f1, COUNT(c) AS $f2, start('w) AS EXPR$4, end('w) AS EXPR$5])
    +- Exchange(distribution=[single])
-      +- Calc(select=[rowtime, a, b, c, *(CAST(c), CAST(c)) AS $f4])
+      +- Calc(select=[rowtime, a, b, c, *(c, c) AS $f4])
          +- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[rowtime, a, b, c])
 ]]>
     </Resource>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/agg/AggTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/agg/AggTestBase.scala
index 69f6b98..94bd102 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/agg/AggTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/agg/AggTestBase.scala
@@ -28,12 +28,11 @@ import org.apache.flink.table.planner.calcite.{FlinkTypeFactory, FlinkTypeSystem
 import org.apache.flink.table.planner.codegen.CodeGeneratorContext
 import org.apache.flink.table.planner.dataview.DataViewSpec
 import org.apache.flink.table.planner.delegation.PlannerBase
-import org.apache.flink.table.planner.functions.aggfunctions.AvgAggFunction.{DoubleAvgAggFunction, IntegralAvgAggFunction}
+import org.apache.flink.table.planner.functions.aggfunctions.AvgAggFunction.{DoubleAvgAggFunction, LongAvgAggFunction}
 import org.apache.flink.table.planner.plan.utils.{AggregateInfo, AggregateInfoList}
 import org.apache.flink.table.runtime.context.ExecutionContext
 import org.apache.flink.table.types.logical.{BigIntType, DoubleType, LogicalType, RowType, VarCharType}
 import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
-
 import org.apache.calcite.rel.core.AggregateCall
 import org.apache.calcite.tools.RelBuilder
 import org.powermock.api.mockito.PowerMockito.{mock, when}
@@ -68,10 +67,11 @@ abstract class AggTestBase(isBatchMode: Boolean) {
     val call = mock(classOf[AggregateCall])
     when(aggInfo, "agg").thenReturn(call)
     when(call, "getName").thenReturn("avg1")
-    when(aggInfo, "function").thenReturn(new IntegralAvgAggFunction)
+    when(aggInfo, "function").thenReturn(new LongAvgAggFunction)
     when(aggInfo, "externalAccTypes").thenReturn(Array(DataTypes.BIGINT, DataTypes.BIGINT))
     when(aggInfo, "argIndexes").thenReturn(Array(1))
     when(aggInfo, "aggIndex").thenReturn(0)
+    when(aggInfo, "externalResultType").thenReturn(DataTypes.BIGINT)
     aggInfo
   }
 
@@ -84,6 +84,7 @@ abstract class AggTestBase(isBatchMode: Boolean) {
     when(aggInfo, "externalAccTypes").thenReturn(Array(DataTypes.DOUBLE, DataTypes.BIGINT))
     when(aggInfo, "argIndexes").thenReturn(Array(2))
     when(aggInfo, "aggIndex").thenReturn(1)
+    when(aggInfo, "externalResultType").thenReturn(DataTypes.DOUBLE)
     aggInfo
   }
 
@@ -96,7 +97,7 @@ abstract class AggTestBase(isBatchMode: Boolean) {
     when(aggInfo, "function").thenReturn(imperativeAggFunc)
     when(aggInfo, "externalAccTypes").thenReturn(
       Array(fromLegacyInfoToDataType(imperativeAggFunc.getAccumulatorType)))
-    when(aggInfo, "externalResultType").thenReturn(DataTypes.DOUBLE)
+    when(aggInfo, "externalResultType").thenReturn(DataTypes.BIGINT)
     when(aggInfo, "viewSpecs").thenReturn(Array[DataViewSpec]())
     when(aggInfo, "argIndexes").thenReturn(Array(3))
     when(aggInfo, "aggIndex").thenReturn(2)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGeneratorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGeneratorTest.scala
index 54ac6b2..ed67569 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGeneratorTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGeneratorTest.scala
@@ -38,9 +38,9 @@ class AggsHandlerCodeGeneratorTest extends AggTestBase(isBatchMode = false) {
     handler.accumulate(GenericRow.of("f0", jl(6L), jd(6.5D), jl(3L)))
     handler.accumulate(GenericRow.of("f0", jl(7L), jd(7.1D), jl(4L)))
     val ret = handler.getValue
-    Assert.assertEquals(6.0, ret.getDouble(0), 0)
+    Assert.assertEquals(6L, ret.getLong(0), 0)
     Assert.assertEquals(6.3, ret.getDouble(1), 0)
-    Assert.assertEquals(3.0, ret.getDouble(2), 0)
+    Assert.assertEquals(3L, ret.getLong(2), 0)
   }
 
   @Test
@@ -52,9 +52,9 @@ class AggsHandlerCodeGeneratorTest extends AggTestBase(isBatchMode = false) {
     handler.accumulate(GenericRow.of("f0", jl(7L), jd(7.4D), jl(4L)))
     handler.retract(GenericRow.of("f0", jl(9L), jd(5.5D), jl(5L)))
     val ret = handler.getValue
-    Assert.assertEquals(4.5, ret.getDouble(0), 0)
+    Assert.assertEquals(4L, ret.getLong(0), 0)
     Assert.assertEquals(6.75, ret.getDouble(1), 0)
-    Assert.assertEquals(2.0, ret.getDouble(2), 0)
+    Assert.assertEquals(2L, ret.getLong(2), 0)
   }
 
   @Test
@@ -65,10 +65,9 @@ class AggsHandlerCodeGeneratorTest extends AggTestBase(isBatchMode = false) {
     handler.merge(GenericRow.of("f0", jl(40L), jl(2L), jd(4D), jl(2L), jt(40L, 2L)))
     handler.merge(GenericRow.of("f0", jl(43L), jl(1L), jd(4D), jl(1L), jt(43L, 1L)))
     val ret = handler.getValue
-    // TODO return 26.6 instead of 26.0 after divide return double instead of long
-    Assert.assertEquals(26.6, ret.getDouble(0), 0)
+    Assert.assertEquals(26L, ret.getLong(0), 0)
     Assert.assertEquals(2.6, ret.getDouble(1), 0)
-    Assert.assertEquals(26.6, ret.getDouble(2), 0)
+    Assert.assertEquals(26L, ret.getLong(2), 0)
   }
 
   private def jl(l: Long): lang.Long = {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/AggWithoutKeysTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/AggWithoutKeysTest.scala
index 7b191a2..ca6caca 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/AggWithoutKeysTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/AggWithoutKeysTest.scala
@@ -22,7 +22,6 @@ import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory
 import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
 import org.apache.flink.table.types.logical.{BigIntType, DoubleType, LogicalType, RowType}
-
 import org.junit.Test
 
 /**
@@ -42,9 +41,9 @@ class AggWithoutKeysTest extends BatchAggTestBase {
 
   override val globalOutputType = RowType.of(
     Array[LogicalType](
+      new BigIntType(),
       new DoubleType(),
-      new DoubleType(),
-      new DoubleType()),
+      new BigIntType()),
     Array(
       "agg1Output",
       "agg2Output",
@@ -72,7 +71,7 @@ class AggWithoutKeysTest extends BatchAggTestBase {
         row(4L, 2L, 4D, 2L, row(4L, 2L)),
         row(6L, 2L, 6D, 2L, row(6L, 2L))
       ),
-      Array(row(3.0D, 3.0D, 3.0D)))
+      Array(row(3L, 3.0D, 3L)))
   }
 
   @Test
@@ -85,7 +84,7 @@ class AggWithoutKeysTest extends BatchAggTestBase {
         row("key1", 4L, 4D, 4L, "aux1"),
         row("key1", 4L, 4D, 4L, "aux1")
       ),
-      Array(row(5.0D, 5.0D, 5.0D)))
+      Array(row(5L, 5.0D, 5L)))
   }
 
   private def getOperatorWithoutKey(isMerge: Boolean, isFinal: Boolean)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/BatchAggTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/BatchAggTestBase.scala
index 74c0350..c254d15 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/BatchAggTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/BatchAggTestBase.scala
@@ -27,10 +27,8 @@ import org.apache.flink.table.planner.codegen.agg.AggTestBase
 import org.apache.flink.table.planner.utils.BaseRowTestUtil
 import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory
 import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo
-import org.apache.flink.table.types.logical.{DoubleType, LogicalType, RowType, VarCharType}
-
+import org.apache.flink.table.types.logical.{BigIntType, DoubleType, LogicalType, RowType, VarCharType}
 import org.junit.Assert
-
 import java.util
 import java.util.function
 
@@ -44,9 +42,9 @@ abstract class BatchAggTestBase extends AggTestBase(isBatchMode = true) {
   val globalOutputType = RowType.of(
     Array[LogicalType](
       new VarCharType(VarCharType.MAX_LENGTH), new VarCharType(VarCharType.MAX_LENGTH),
+      new BigIntType(),
       new DoubleType(),
-      new DoubleType(),
-      new DoubleType()),
+      new BigIntType()),
     Array(
       "f0", "f4",
       "agg1Output",
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGeneratorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGeneratorTest.scala
index 6e8c648..ce12394 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGeneratorTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGeneratorTest.scala
@@ -20,11 +20,10 @@ package org.apache.flink.table.planner.codegen.agg.batch
 
 import org.apache.flink.table.api.DataTypes
 import org.apache.flink.table.dataformat.BaseRow
-import org.apache.flink.table.planner.functions.aggfunctions.AvgAggFunction.IntegralAvgAggFunction
+import org.apache.flink.table.planner.functions.aggfunctions.AvgAggFunction.LongAvgAggFunction
 import org.apache.flink.table.planner.plan.utils.{AggregateInfo, AggregateInfoList}
 import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory
 import org.apache.flink.table.types.logical.{BigIntType, DoubleType, LogicalType, RowType, VarCharType}
-
 import org.apache.calcite.rel.core.AggregateCall
 import org.junit.Test
 import org.powermock.api.mockito.PowerMockito.{mock, when}
@@ -52,7 +51,7 @@ class HashAggCodeGeneratorTest extends BatchAggTestBase {
     val call = mock(classOf[AggregateCall])
     when(aggInfo, "agg").thenReturn(call)
     when(call, "getName").thenReturn("avg3")
-    when(aggInfo, "function").thenReturn(new IntegralAvgAggFunction)
+    when(aggInfo, "function").thenReturn(new LongAvgAggFunction)
     when(aggInfo, "externalAccTypes").thenReturn(Array(DataTypes.BIGINT, DataTypes.BIGINT))
     when(aggInfo, "argIndexes").thenReturn(Array(3))
     when(aggInfo, "aggIndex").thenReturn(2)
@@ -89,8 +88,8 @@ class HashAggCodeGeneratorTest extends BatchAggTestBase {
         row("key2", "aux2", 8L, 2L, 8D, 2L, 8L, 2L)
       ),
       Array(
-        row("key1", "aux1", 3.0D, 3.0D, 3.0D),
-        row("key2", "aux2", 4.0D, 4.0D, 4.0D))
+        row("key1", "aux1", 3L, 3.0D, 3L),
+        row("key2", "aux2", 4L, 4.0D, 4L))
     )
   }
 
@@ -106,8 +105,8 @@ class HashAggCodeGeneratorTest extends BatchAggTestBase {
         row("key2", 3L, 3D, 3L, "aux2")
       ),
       Array(
-        row("key1", "aux1", 5.5D, 5.5D, 5.5D),
-        row("key2", "aux2", 3.0D, 3.0D, 3.0D))
+        row("key1", "aux1", 5L, 5.5D, 5L),
+        row("key2", "aux2", 3L, 3.0D, 3L))
     )
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/SortAggCodeGeneratorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/SortAggCodeGeneratorTest.scala
index f23549e..918fa27 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/SortAggCodeGeneratorTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/SortAggCodeGeneratorTest.scala
@@ -69,8 +69,8 @@ class SortAggCodeGeneratorTest extends BatchAggTestBase {
         row("key2", "aux2", 8L, 2L, 8D, 2L, row(8L, 2L))
       ),
       Array(
-        row("key1", "aux1", 3.0D, 3.0D, 3.0D),
-        row("key2", "aux2", 4.0D, 4.0D, 4.0D))
+        row("key1", "aux1", 3L, 3.0D, 3L),
+        row("key2", "aux2", 4L, 4.0D, 4L))
     )
   }
 
@@ -86,8 +86,8 @@ class SortAggCodeGeneratorTest extends BatchAggTestBase {
         row("key2", 3L, 3D, 3L, "aux2")
       ),
       Array(
-        row("key1", "aux1", 5.5D, 5.5D, 5.5D),
-        row("key2", "aux2", 3.0D, 3.0D, 3.0D))
+        row("key1", "aux1", 5L, 5.5D, 5L),
+        row("key2", "aux2", 3L, 3.0D, 3L))
     )
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
index 6f9bd60..0fb58bf 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
@@ -1470,7 +1470,7 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   def testDivide(): Unit = {
 
     testAllApis(
-      1514356320000L / 60000.0, // the `/` is Scala operator, not Flink TableApi operator
+      1514356320000L / 60000, // the `/` is Scala operator, not Flink TableApi operator
       "1514356320000L / 60000",
       "1514356320000 / 60000",
       "25239272")
@@ -1479,7 +1479,7 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
       'f7 / 2,
       "f7 / 2",
       "f7 / 2",
-      "1.5")
+      "1")
 
     // f34 => Decimal(19,0)
     // 6 => Integer => Decimal(10,0)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala
index b8e3f6b..92905e7 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala
@@ -120,8 +120,6 @@ class AggCallSelectivityEstimatorTest {
     val aggCalls = sqlAggFunWithArg.map {
       case (sqlAggFun, arg) =>
         val aggCallType = sqlAggFun match {
-          case SqlStdOperatorTable.AVG =>
-            typeFactory.createSqlType(SqlTypeName.DOUBLE)
           case SqlStdOperatorTable.COUNT =>
             typeFactory.createSqlType(SqlTypeName.BIGINT)
           case _ =>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/DecimalITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/DecimalITCase.scala
index 237d9ea..88a4dd3 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/DecimalITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/DecimalITCase.scala
@@ -412,25 +412,6 @@ class DecimalITCase extends BatchTestBase {
       s1r(d"3.12"))
   }
 
-  @Test
-  def testDiv(): Unit = {
-
-    // see DivCallGen
-    checkQuery1(
-      Seq(DECIMAL(7, 0), INT),
-      s1r(d"7", 2),
-      "select div(f0, f1), div(100*f1, f0) from Table1",
-      Seq(DECIMAL(7, 0), DECIMAL(10, 0)),
-      s1r(3, 200 / 7))
-
-    checkQuery1(
-      Seq(DECIMAL(10, 1), DECIMAL(10, 3)),
-      s1r(d"7.9", d"2.009"),
-      "select div(f0, f1), div(100*f1, f0) from Table1",
-      Seq(DECIMAL(12, 0), DECIMAL(18, 0)),
-      s1r(3, 2009 / 79))
-  }
-
   @Test // functions that treat Decimal as exact value
   def testExactFunctions(): Unit = {
     checkQuery1(
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MiscITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MiscITCase.scala
index d1e929a..2f389ce 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MiscITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MiscITCase.scala
@@ -117,7 +117,7 @@ class MiscITCase extends BatchTestBase {
     checkQuery(
       testData,
       "select sum(f0), avg(f0), count(1) from Table1",
-      Seq((5050, 50.5, 100L))
+      Seq((5050, 50, 100L))
     )
     val testData2 = Seq((1, 1), (1, 2), (2, 1), (2, 2), (3, 1), (3, 2))
     checkQuery(
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverWindowITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverWindowITCase.scala
index a4881b9..bd73238 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverWindowITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverWindowITCase.scala
@@ -218,25 +218,25 @@ class OverWindowITCase extends BatchTestBase {
           "min(e) over (partition by d order by e desc) FROM Table5",
       Seq(
         // d  e  r  r  d  s  c  a  ma m mi m
-        row( 1, 1, 1, 1, 1, 1, 1, 1.0, 1, 1, 1, 1),
+        row( 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1),
 
-        row( 2, 2, 2, 2, 2, 5, 2, 2.5, 2, 3, 2, 2),
-        row( 2, 3, 1, 1, 1, 3, 1, 3.0, 3, 3, 2, 3),
+        row( 2, 2, 2, 2, 2, 5, 2, 2, 2, 3, 2, 2),
+        row( 2, 3, 1, 1, 1, 3, 1, 3, 3, 3, 2, 3),
 
-        row( 3, 4, 3, 3, 3,15, 3, 5.0, 4, 6, 4, 4),
-        row( 3, 5, 2, 2, 2,11, 2, 5.5, 5, 6, 4, 5),
-        row( 3, 6, 1, 1, 1, 6, 1, 6.0, 6, 6, 4, 6),
+        row( 3, 4, 3, 3, 3,15, 3, 5, 4, 6, 4, 4),
+        row( 3, 5, 2, 2, 2,11, 2, 5, 5, 6, 4, 5),
+        row( 3, 6, 1, 1, 1, 6, 1, 6, 6, 6, 4, 6),
 
-        row( 4, 7, 4, 4, 4,34, 4, 8.5, 7,10, 7, 7),
-        row( 4, 8, 3, 3, 3,27, 3, 9.0, 8,10, 7, 8),
-        row( 4, 9, 2, 2, 2,19, 2, 9.5, 9,10, 7, 9),
-        row( 4,10, 1, 1, 1,10, 1,10.0,10,10, 7,10),
+        row( 4, 7, 4, 4, 4,34, 4, 8, 7,10, 7, 7),
+        row( 4, 8, 3, 3, 3,27, 3, 9, 8,10, 7, 8),
+        row( 4, 9, 2, 2, 2,19, 2, 9, 9,10, 7, 9),
+        row( 4,10, 1, 1, 1,10, 1,10,10,10, 7,10),
 
-        row( 5,11, 5, 5, 5,65, 5,13.0,11,15,11,11),
-        row( 5,12, 4, 4, 4,54, 4,13.5,12,15,11,12),
-        row( 5,13, 3, 3, 3,42, 3,14.0,13,15,11,13),
-        row( 5,14, 2, 2, 2,29, 2,14.5,14,15,11,14),
-        row( 5,15, 1, 1, 1,15, 1,15.0,15,15,11,15)
+        row( 5,11, 5, 5, 5,65, 5,13,11,15,11,11),
+        row( 5,12, 4, 4, 4,54, 4,13,12,15,11,12),
+        row( 5,13, 3, 3, 3,42, 3,14,13,15,11,13),
+        row( 5,14, 2, 2, 2,29, 2,14,14,15,11,14),
+        row( 5,15, 1, 1, 1,15, 1,15,15,15,11,15)
       ))
 
     checkResult(
@@ -571,21 +571,21 @@ class OverWindowITCase extends BatchTestBase {
     checkResult(
       "SELECT d, e, avg(e) over (partition by d order by e desc) FROM Table5",
       Seq(
-        row(1, 1, 1.0),
-        row(2, 3, 3.0),
-        row(2, 2, 2.5),
-        row(3, 6, 6.0),
-        row(3, 5, 5.5),
-        row(3, 4, 5.0),
-        row(4, 10, 10.0),
-        row(4, 9, 9.5),
-        row(4, 8, 9.0),
-        row(4, 7, 8.5),
-        row(5, 15, 15.0),
-        row(5, 14, 14.5),
-        row(5, 13, 14.0),
-        row(5, 12, 13.5),
-        row(5, 11, 13.0)
+        row(1, 1, 1),
+        row(2, 3, 3),
+        row(2, 2, 2),
+        row(3, 6, 6),
+        row(3, 5, 5),
+        row(3, 4, 5),
+        row(4, 10, 10),
+        row(4, 9, 9),
+        row(4, 8, 9),
+        row(4, 7, 8),
+        row(5, 15, 15),
+        row(5, 14, 14),
+        row(5, 13, 14),
+        row(5, 12, 13),
+        row(5, 11, 13)
       )
     )
 
@@ -664,21 +664,21 @@ class OverWindowITCase extends BatchTestBase {
     checkResult(
       "SELECT d, avg(e) over (partition by d) FROM Table5",
       Seq(
-        row(1, 1.0),
-        row(2, 2.5),
-        row(2, 2.5),
-        row(3, 5.0),
-        row(3, 5.0),
-        row(3, 5.0),
-        row(4, 8.5),
-        row(4, 8.5),
-        row(4, 8.5),
-        row(4, 8.5),
-        row(5, 13.0),
-        row(5, 13.0),
-        row(5, 13.0),
-        row(5, 13.0),
-        row(5, 13.0)
+        row(1, 1),
+        row(2, 2),
+        row(2, 2),
+        row(3, 5),
+        row(3, 5),
+        row(3, 5),
+        row(4, 8),
+        row(4, 8),
+        row(4, 8),
+        row(4, 8),
+        row(5, 13),
+        row(5, 13),
+        row(5, 13),
+        row(5, 13),
+        row(5, 13)
       )
     )
   }
@@ -2461,21 +2461,21 @@ class OverWindowITCase extends BatchTestBase {
     checkResult(
       sqlQuery,
       Seq(
-        row(1, 1L, 30, 1.0, 15, 1, 1, true, null, false, null),
-        row(2, 2L, 30, 1.0, 15, 1, 1, true, null, false, null),
-        row(2, 3L, 30, 1.0, 15, 1, 1, true, null, false, null),
-        row(3, 4L, 30, 1.0, 15, 1, 1, true, null, false, null),
-        row(3, 5L, 30, 1.0, 15, 1, 1, true, null, false, null),
-        row(3, 6L, 30, 1.0, 15, 1, 1, true, null, false, null),
-        row(4, 7L, 30, 1.0, 15, 1, 1, true, null, false, null),
-        row(4, 8L, 30, 1.0, 15, 1, 1, true, null, false, null),
-        row(4, 9L, 30, 1.0, 15, 1, 1, true, null, false, null),
-        row(4, 10L, 30, 1.0, 15, 1, 1, true, null, false, null),
-        row(5, 11L, 30, 1.0, 15, 1, 1, true, null, false, null),
-        row(5, 12L, 30, 1.0, 15, 1, 1, true, null, false, null),
-        row(5, 13L, 30, 1.0, 15, 1, 1, true, null, false, null),
-        row(5, 14L, 30, 1.0, 15, 1, 1, true, null, false, null),
-        row(5, 15L, 30, 1.0, 15, 1, 1, true, null, false, null)))
+        row(1, 1L, 30, 1, 15, 1, 1, true, null, false, null),
+        row(2, 2L, 30, 1, 15, 1, 1, true, null, false, null),
+        row(2, 3L, 30, 1, 15, 1, 1, true, null, false, null),
+        row(3, 4L, 30, 1, 15, 1, 1, true, null, false, null),
+        row(3, 5L, 30, 1, 15, 1, 1, true, null, false, null),
+        row(3, 6L, 30, 1, 15, 1, 1, true, null, false, null),
+        row(4, 7L, 30, 1, 15, 1, 1, true, null, false, null),
+        row(4, 8L, 30, 1, 15, 1, 1, true, null, false, null),
+        row(4, 9L, 30, 1, 15, 1, 1, true, null, false, null),
+        row(4, 10L, 30, 1, 15, 1, 1, true, null, false, null),
+        row(5, 11L, 30, 1, 15, 1, 1, true, null, false, null),
+        row(5, 12L, 30, 1, 15, 1, 1, true, null, false, null),
+        row(5, 13L, 30, 1, 15, 1, 1, true, null, false, null),
+        row(5, 14L, 30, 1, 15, 1, 1, true, null, false, null),
+        row(5, 15L, 30, 1, 15, 1, 1, true, null, false, null)))
   }
 }
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateITCaseBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateITCaseBase.scala
index fbff332..d8155fa 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateITCaseBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateITCaseBase.scala
@@ -83,7 +83,7 @@ abstract class AggregateITCaseBase(testName: String) extends BatchTestBase {
     // for hash agg mode it will fallback
     val largeData5 = for (i <- 0 until 100000) yield row(i, 1L, 10, "Hallo", 1L)
     registerCollection("LargeTable5", largeData5, type5, "d, e, f, g, h")
-    val expected = for (i <- 0 until 100000) yield row(i, "Hallo", 1L, 10.0, 1L)
+    val expected = for (i <- 0 until 100000) yield row(i, "Hallo", 1L, 10, 1L)
     checkResult(
       "SELECT d, g, sum(e), avg(f), min(h) FROM LargeTable5 GROUP BY d, g",
       expected
@@ -95,7 +95,7 @@ abstract class AggregateITCaseBase(testName: String) extends BatchTestBase {
     registerCollection("LargeTypedTable5", largeTypedData5, genericType5, "d, e, f, g, h")
     val expectedTypedData5 =
       for (i <- 0 until 100000) yield
-        row(row(i, i), "Hallo", 1L, 10.0, 1L)
+        row(row(i, i), "Hallo", 1L, 10, 1L)
     checkResult(
       "SELECT d, g, sum(e), avg(f), min(h) FROM LargeTypedTable5 GROUP BY d, g",
       expectedTypedData5
@@ -106,7 +106,7 @@ abstract class AggregateITCaseBase(testName: String) extends BatchTestBase {
     registerCollection("SingleGroupLargeTable5", singleGrouplargeData5, type5, "d, e, f, g, h")
     checkResult(
       "SELECT d, g, sum(e), avg(f), min(h) FROM SingleGroupLargeTable5 GROUP BY d, g",
-      Seq(row(999, "Hallo", 100000L, 10.0, 1L))
+      Seq(row(999, "Hallo", 100000L, 10, 1L))
     )
   }
 
@@ -127,9 +127,9 @@ abstract class AggregateITCaseBase(testName: String) extends BatchTestBase {
     checkResult(
       "SELECT sum(d), avg(d), count(g), min(e), h FROM Table5 GROUP BY h",
       Seq(
-        row(16, 16.0 / 5, 5, 1L, 1),
-        row(26, 26.0 / 7, 7, 2L, 2),
-        row(13, 13.0 / 3, 3, 6L, 3)
+        row(16, 16 / 5, 5, 1L, 1),
+        row(26, 26 / 7, 7, 2L, 2),
+        row(13, 13 / 3, 3, 6L, 3)
       )
     )
   }
@@ -163,7 +163,7 @@ abstract class AggregateITCaseBase(testName: String) extends BatchTestBase {
     checkResult(
       "SELECT sum(d), avg(d), count(g), min(e) FROM Table5",
       Seq(
-        row(55, 55.0 / 15, 15, 1L)
+        row(55, 55 / 15, 15, 1L)
       )
     )
   }
@@ -196,7 +196,7 @@ abstract class AggregateITCaseBase(testName: String) extends BatchTestBase {
     checkResult(
       "SELECT avg(d + 2) + 2 FROM Table5",
       Seq(
-        row(85.0 / 15 + 2)
+        row(85 / 15 + 2)
       )
     )
   }
@@ -537,7 +537,7 @@ abstract class AggregateITCaseBase(testName: String) extends BatchTestBase {
     checkQuery(
       Seq[(Integer, Integer)]((1, 1), (1, 2), (2, 1), (2, 2), (3, 1), (3, 2)),
       "select avg(f0), avg(f0) from TableName", // spark has mean(), but we don't
-      Seq((2.0, 2.0))
+      Seq((2, 2))
     )
 
     checkQuery(
@@ -557,7 +557,7 @@ abstract class AggregateITCaseBase(testName: String) extends BatchTestBase {
     checkQuery(
       Seq[(Integer, Integer)]((1, 1), (1, 2), (2, 1), (2, 2), (3, 1), (3, 2)),
       "select avg(f0), sum(distinct f0) from TableName",
-      Seq((2.0, 6))
+      Seq((2, 6))
     )
     checkQuery(
       Seq((b1, b1), (b1, b2), (b2, b1), (b2, b2), (b3, b1), (b3, b2)),
@@ -579,7 +579,7 @@ abstract class AggregateITCaseBase(testName: String) extends BatchTestBase {
     checkQuery(
       testData3,
       "select avg(f1) from TableName",
-      Seq(Tuple1(2.0))
+      Seq(Tuple1(2))
     )
   }
 
@@ -591,12 +591,12 @@ abstract class AggregateITCaseBase(testName: String) extends BatchTestBase {
     checkQuery(
       testData3,
       "select avg(f1), count(distinct f1) from TableName",
-      Seq((2.0, 1L))
+      Seq((2, 1L))
     )
     checkQuery(
       testData3,
       "select avg(f1), sum(distinct f1) from TableName",
-      Seq((2.0, 2))
+      Seq((2, 2))
     )
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateReduceGroupingITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateReduceGroupingITCase.scala
index f4f1ef0..3946c63 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateReduceGroupingITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateReduceGroupingITCase.scala
@@ -151,18 +151,18 @@ class AggregateReduceGroupingITCase extends BatchTestBase {
       Seq(row(2, 1, 1), row(3, 2, 1), row(5, 2, 1), row(6, 3, 1)))
     // group by string
     checkResult("SELECT a1, c1, count(d1), avg(b1) FROM T1 GROUP BY a1, c1",
-      Seq(row(2, "A", 0, 1.0), row(3, "A", 1, 2.0), row(5, "B", 1, 2.0), row(6, "C", 1, 3.0)))
+      Seq(row(2, "A", 0, 1), row(3, "A", 1, 2), row(5, "B", 1, 2), row(6, "C", 1, 3)))
     checkResult("SELECT c5, d5, avg(b5), avg(a5) FROM T5 WHERE d5 IS NOT NULL GROUP BY c5, d5",
-      Seq(row("B", "Hi", 2.0, 3.0), row("C", "Hello", null, 1.0),
-        row("D", "Hello world", 3.0, 4.0), row("E", "Hello world, how are you?", 1.0, 3.0),
-        row("I", "hahaha", 2.0, 7.0), row("J", "I am fine.", 1.0, 6.0)))
+      Seq(row("B", "Hi", 2, 3), row("C", "Hello", null, 1),
+        row("D", "Hello world", 3, 4), row("E", "Hello world, how are you?", 1, 3),
+        row("I", "hahaha", 2, 7), row("J", "I am fine.", 1, 6)))
     // group by string with null
     checkResult("SELECT a1, d1, count(d1) FROM T1 GROUP BY a1, d1",
       Seq(row(2, null, 0), row(3, "Hi", 1), row(5, "Hello", 1), row(6, "Hello world", 1)))
     checkResult("SELECT c5, d5, avg(b5), avg(a5) FROM T5 GROUP BY c5, d5",
-      Seq(row("A", null, 1.0, 2.0), row("B", "Hi", 2.0, 3.0), row("C", "Hello", null, 1.0),
-        row("D", "Hello world", 3.0, 4.0), row("E", "Hello world, how are you?", 1.0, 3.0),
-        row("F", null, null, 5.0), row("I", "hahaha", 2.0, 7.0), row("J", "I am fine.", 1.0, 6.0)))
+      Seq(row("A", null, 1, 2), row("B", "Hi", 2, 3), row("C", "Hello", null, 1),
+        row("D", "Hello world", 3, 4), row("E", "Hello world, how are you?", 1, 3),
+        row("F", null, null, 5), row("I", "hahaha", 2, 7), row("J", "I am fine.", 1, 6)))
 
     checkResult("SELECT a3, b3, count(c3) FROM T3 GROUP BY a3, b3",
       Seq(row(1, 10, 1), row(2, 20, 2), row(3, 10, 1), row(4, 20, 1), row(4, null, 1)))
@@ -176,14 +176,14 @@ class AggregateReduceGroupingITCase extends BatchTestBase {
 
     // large data, for hash agg mode it will fallback
     checkResult("SELECT a6, c6, avg(b6), count(d6), avg(e6) FROM T6 GROUP BY a6, c6",
-      (0 until 50000).map(i => row(i, if (i % 500 == 0) null else s"Hello$i", 1D, 1L, 10D))
+      (0 until 50000).map(i => row(i, if (i % 500 == 0) null else s"Hello$i", 1, 1L, 10))
     )
     checkResult("SELECT a6, d6, avg(b6), count(c6), avg(e6) FROM T6 GROUP BY a6, d6",
-      (0 until 50000).map(i => row(i, "Hello world", 1D, if (i % 500 == 0) 0L else 1L, 10D))
+      (0 until 50000).map(i => row(i, "Hello world", 1, if (i % 500 == 0) 0L else 1L, 10))
     )
     checkResult("SELECT a6, f6, avg(b6), count(c6), avg(e6) FROM T6 GROUP BY a6, f6",
-      (0 until 50000).map(i => row(i, new Date(i + 1531820000000L), 1D,
-        if (i % 500 == 0) 0L else 1L, 10D))
+      (0 until 50000).map(i => row(i, new Date(i + 1531820000000L), 1,
+        if (i % 500 == 0) 0L else 1L, 10))
     )
   }
 
@@ -283,17 +283,17 @@ class AggregateReduceGroupingITCase extends BatchTestBase {
 
     checkResult("SELECT a4, c4, COUNT(b4), AVG(b4) FROM T4 " +
       "GROUP BY a4, c4, TUMBLE(d4, INTERVAL '15' MINUTE)",
-      Seq(row(1, "A", 1, 1.0), row(2, "B", 1, 1.0), row(3, "B", 1, 2.0), row(4, "C", 1, 3.0)))
+      Seq(row(1, "A", 1, 1), row(2, "B", 1, 1), row(3, "B", 1, 2), row(4, "C", 1, 3)))
 
     checkResult("SELECT a4, e4, s, avg(ab), count(cb) FROM " +
       "(SELECT a4, e4, avg(b4) as ab, count(b4) AS cb, " +
       "TUMBLE_START(d4, INTERVAL '15' MINUTE) AS s, " +
       "TUMBLE_END(d4, INTERVAL '15' MINUTE) AS e FROM T4 " +
       "GROUP BY a4, e4, TUMBLE(d4, INTERVAL '15' MINUTE)) t GROUP BY a4, e4, s",
-      Seq(row(1, "Hi", LocalDateTime.of(2018, 6, 1, 10, 0, 0), 1D, 1),
-        row(2, "Hello", LocalDateTime.of(2018, 6, 1, 10, 0, 0), 1D, 1),
-        row(3, "Hello world", LocalDateTime.of(2018, 6, 1, 10, 15, 0), 2D, 1),
-        row(4, "I am fine.", LocalDateTime.of(2018, 6, 1, 10, 30, 0), 3D, 1)))
+      Seq(row(1, "Hi", LocalDateTime.of(2018, 6, 1, 10, 0, 0), 1, 1),
+        row(2, "Hello", LocalDateTime.of(2018, 6, 1, 10, 0, 0), 1, 1),
+        row(3, "Hello world", LocalDateTime.of(2018, 6, 1, 10, 15, 0), 2, 1),
+        row(4, "I am fine.", LocalDateTime.of(2018, 6, 1, 10, 30, 0), 3, 1)))
 
     checkResult("SELECT a4, c4, s, COUNT(b4) FROM " +
       "(SELECT a4, c4, avg(b4) AS b4, " +
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/GroupingSetsITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/GroupingSetsITCase.scala
index c5fd60e..2f03e77 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/GroupingSetsITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/GroupingSetsITCase.scala
@@ -127,7 +127,7 @@ class GroupingSetsITCase extends BatchTestBase {
       "select deptno, avg(age) as a, group_id() as g," +
         " grouping(deptno) as gb, grouping_id(deptno)as gib" +
         " from emps group by grouping sets (deptno)",
-      Seq(row(10, 25.0, 0, 0, 0), row(20, 42.5, 0, 0, 0), row(40, null, 0, 0, 0))
+      Seq(row(10, 25, 0, 0, 0), row(20, 42, 0, 0, 0), row(40, null, 0, 0, 0))
     )
   }
 
@@ -208,11 +208,11 @@ class GroupingSetsITCase extends BatchTestBase {
     checkResult(
       "select deptno / 2 + 1 as half1, count(*) as c from emp " +
         "group by rollup(deptno / 2, gender), rollup(substring(ename FROM 1 FOR 1))",
-      Seq(row(11.0, 1), row(11.0, 1), row(11.0, 1), row(11.0, 1), row(16.0, 1), row(16.0, 1),
-        row(16.0, 1), row(16.0, 1), row(16.0, 2), row(16.0, 2), row(26.0, 1), row(26.0, 1),
-        row(26.0, 1), row(26.0, 1), row(26.0, 1), row(26.0, 1), row(26.0, 2), row(31.0, 1),
-        row(31.0, 1), row(31.0, 1), row(31.0, 1), row(6.0, 1), row(6.0, 1), row(6.0, 1),
-        row(6.0, 1), row(6.0, 1), row(6.0, 1), row(6.0, 2), row(null, 1), row(null, 1),
+      Seq(row(11, 1), row(11, 1), row(11, 1), row(11, 1), row(16, 1), row(16, 1),
+        row(16, 1), row(16, 1), row(16, 2), row(16, 2), row(26, 1), row(26, 1),
+        row(26, 1), row(26, 1), row(26, 1), row(26, 1), row(26, 2), row(31, 1),
+        row(31, 1), row(31, 1), row(31, 1), row(6, 1), row(6, 1), row(6, 1),
+        row(6, 1), row(6, 1), row(6, 1), row(6, 2), row(null, 1), row(null, 1),
         row(null, 1), row(null, 1), row(null, 1), row(null, 1), row(null, 1),
         row(null, 1), row(null, 1), row(null, 2), row(null, 2), row(null, 9))
     )
@@ -460,34 +460,34 @@ class GroupingSetsITCase extends BatchTestBase {
         " GROUPING SETS (f1, f2, ())"
 
     val expected = Seq(
-      row(1, null, 1.0, 0, 0, 1, 0, 1, 1, 1),
-      row(6, null, 18.5, 0, 0, 1, 0, 1, 1, 6),
-      row(2, null, 2.5, 0, 0, 1, 0, 1, 1, 2),
-      row(4, null, 8.5, 0, 0, 1, 0, 1, 1, 4),
-      row(5, null, 13.0, 0, 0, 1, 0, 1, 1, 5),
-      row(3, null, 5.0, 0, 0, 1, 0, 1, 1, 3),
-      row(null, "Comment#11", 17.0, 0, 1, 0, 1, 0, 2, 1),
-      row(null, "Comment#8", 14.0, 0, 1, 0, 1, 0, 2, 1),
-      row(null, "Comment#2", 8.0, 0, 1, 0, 1, 0, 2, 1),
-      row(null, "Comment#1", 7.0, 0, 1, 0, 1, 0, 2, 1),
-      row(null, "Comment#14", 20.0, 0, 1, 0, 1, 0, 2, 1),
-      row(null, "Comment#7", 13.0, 0, 1, 0, 1, 0, 2, 1),
-      row(null, "Comment#6", 12.0, 0, 1, 0, 1, 0, 2, 1),
-      row(null, "Comment#3", 9.0, 0, 1, 0, 1, 0, 2, 1),
-      row(null, "Comment#12", 18.0, 0, 1, 0, 1, 0, 2, 1),
-      row(null, "Comment#5", 11.0, 0, 1, 0, 1, 0, 2, 1),
-      row(null, "Comment#15", 21.0, 0, 1, 0, 1, 0, 2, 1),
-      row(null, "Comment#4", 10.0, 0, 1, 0, 1, 0, 2, 1),
-      row(null, "Hi", 1.0, 0, 1, 0, 1, 0, 2, 1),
-      row(null, "Comment#10", 16.0, 0, 1, 0, 1, 0, 2, 1),
-      row(null, "Hello world", 3.0, 0, 1, 0, 1, 0, 2, 1),
-      row(null, "I am fine.", 5.0, 0, 1, 0, 1, 0, 2, 1),
-      row(null, "Hello world, how are you?", 4.0, 0, 1, 0, 1, 0, 2, 1),
-      row(null, "Comment#9", 15.0, 0, 1, 0, 1, 0, 2, 1),
-      row(null, "Comment#13", 19.0, 0, 1, 0, 1, 0, 2, 1),
-      row(null, "Luke Skywalker", 6.0, 0, 1, 0, 1, 0, 2, 1),
-      row(null, "Hello", 2.0, 0, 1, 0, 1, 0, 2, 1),
-      row(null, null, 11.0, 0, 1, 1, 1, 1, 3, 21))
+      row(1, null, 1, 0, 0, 1, 0, 1, 1, 1),
+      row(6, null, 18, 0, 0, 1, 0, 1, 1, 6),
+      row(2, null, 2, 0, 0, 1, 0, 1, 1, 2),
+      row(4, null, 8, 0, 0, 1, 0, 1, 1, 4),
+      row(5, null, 13, 0, 0, 1, 0, 1, 1, 5),
+      row(3, null, 5, 0, 0, 1, 0, 1, 1, 3),
+      row(null, "Comment#11", 17, 0, 1, 0, 1, 0, 2, 1),
+      row(null, "Comment#8", 14, 0, 1, 0, 1, 0, 2, 1),
+      row(null, "Comment#2", 8, 0, 1, 0, 1, 0, 2, 1),
+      row(null, "Comment#1", 7, 0, 1, 0, 1, 0, 2, 1),
+      row(null, "Comment#14", 20, 0, 1, 0, 1, 0, 2, 1),
+      row(null, "Comment#7", 13, 0, 1, 0, 1, 0, 2, 1),
+      row(null, "Comment#6", 12, 0, 1, 0, 1, 0, 2, 1),
+      row(null, "Comment#3", 9, 0, 1, 0, 1, 0, 2, 1),
+      row(null, "Comment#12", 18, 0, 1, 0, 1, 0, 2, 1),
+      row(null, "Comment#5", 11, 0, 1, 0, 1, 0, 2, 1),
+      row(null, "Comment#15", 21, 0, 1, 0, 1, 0, 2, 1),
+      row(null, "Comment#4", 10, 0, 1, 0, 1, 0, 2, 1),
+      row(null, "Hi", 1, 0, 1, 0, 1, 0, 2, 1),
+      row(null, "Comment#10", 16, 0, 1, 0, 1, 0, 2, 1),
+      row(null, "Hello world", 3, 0, 1, 0, 1, 0, 2, 1),
+      row(null, "I am fine.", 5, 0, 1, 0, 1, 0, 2, 1),
+      row(null, "Hello world, how are you?", 4, 0, 1, 0, 1, 0, 2, 1),
+      row(null, "Comment#9", 15, 0, 1, 0, 1, 0, 2, 1),
+      row(null, "Comment#13", 19, 0, 1, 0, 1, 0, 2, 1),
+      row(null, "Luke Skywalker", 6, 0, 1, 0, 1, 0, 2, 1),
+      row(null, "Hello", 2, 0, 1, 0, 1, 0, 2, 1),
+      row(null, null, 11, 0, 1, 1, 1, 1, 3, 21))
     checkResult(query, expected)
   }
 
@@ -496,32 +496,32 @@ class GroupingSetsITCase extends BatchTestBase {
     val query = "SELECT f1, f2, avg(f0) as a, GROUP_ID() as g FROM " +
       TABLE_WITH_NULLS_NAME + " GROUP BY GROUPING SETS (f1, f2)"
     val expected = Seq(
-      row(6, null, 18.5, 0),
-      row(5, null, 13.0, 0),
-      row(4, null, 8.5, 0),
-      row(3, null, 5.0, 0),
-      row(2, null, 2.5, 0),
-      row(1, null, 1.0, 0),
-      row(null, "Luke Skywalker", 6.0, 0),
-      row(null, "I am fine.", 5.0, 0),
-      row(null, "Hi", 1.0, 0),
-      row(null, null, 3.5, 0),
-      row(null, "Hello", 2.0, 0),
-      row(null, "Comment#9", 15.0, 0),
-      row(null, "Comment#8", 14.0, 0),
-      row(null, "Comment#7", 13.0, 0),
-      row(null, "Comment#6", 12.0, 0),
-      row(null, "Comment#5", 11.0, 0),
-      row(null, "Comment#4", 10.0, 0),
-      row(null, "Comment#3", 9.0, 0),
-      row(null, "Comment#2", 8.0, 0),
-      row(null, "Comment#15", 21.0, 0),
-      row(null, "Comment#14", 20.0, 0),
-      row(null, "Comment#13", 19.0, 0),
-      row(null, "Comment#12", 18.0, 0),
-      row(null, "Comment#11", 17.0, 0),
-      row(null, "Comment#10", 16.0, 0),
-      row(null, "Comment#1", 7.0, 0))
+      row(6, null, 18, 0),
+      row(5, null, 13, 0),
+      row(4, null, 8, 0),
+      row(3, null, 5, 0),
+      row(2, null, 2, 0),
+      row(1, null, 1, 0),
+      row(null, "Luke Skywalker", 6, 0),
+      row(null, "I am fine.", 5, 0),
+      row(null, "Hi", 1, 0),
+      row(null, null, 3, 0),
+      row(null, "Hello", 2, 0),
+      row(null, "Comment#9", 15, 0),
+      row(null, "Comment#8", 14, 0),
+      row(null, "Comment#7", 13, 0),
+      row(null, "Comment#6", 12, 0),
+      row(null, "Comment#5", 11, 0),
+      row(null, "Comment#4", 10, 0),
+      row(null, "Comment#3", 9, 0),
+      row(null, "Comment#2", 8, 0),
+      row(null, "Comment#15", 21, 0),
+      row(null, "Comment#14", 20, 0),
+      row(null, "Comment#13", 19, 0),
+      row(null, "Comment#12", 18, 0),
+      row(null, "Comment#11", 17, 0),
+      row(null, "Comment#10", 16, 0),
+      row(null, "Comment#1", 7, 0))
     checkResult(query, expected)
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/WindowAggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/WindowAggregateITCase.scala
index 620beb0..4cbd7ed 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/WindowAggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/WindowAggregateITCase.scala
@@ -316,17 +316,17 @@ class WindowAggregateITCase extends BatchTestBase {
           "FROM Table3WithTimestamp " +
           "GROUP BY HOP(ts, INTERVAL '2' SECOND, INTERVAL '3' SECOND)",
       Seq(
-        row(1.5, 3, localDateTime("1970-01-01 00:00:00.0")),
-        row(11.0, 33, localDateTime("1970-01-01 00:00:10.0")),
-        row(13.0, 39, localDateTime("1970-01-01 00:00:12.0")),
-        row(15.0, 45, localDateTime("1970-01-01 00:00:14.0")),
-        row(17.0, 51, localDateTime("1970-01-01 00:00:16.0")),
-        row(19.0, 57, localDateTime("1970-01-01 00:00:18.0")),
-        row(20.5, 41, localDateTime("1970-01-01 00:00:20.0")),
-        row(3.0, 9, localDateTime("1970-01-01 00:00:02.0")),
-        row(5.0, 15, localDateTime("1970-01-01 00:00:04.0")),
-        row(7.0, 21, localDateTime("1970-01-01 00:00:06.0")),
-        row(9.0, 27, localDateTime("1970-01-01 00:00:08.0"))
+        row(1, 3, localDateTime("1970-01-01 00:00:00.0")),
+        row(11, 33, localDateTime("1970-01-01 00:00:10.0")),
+        row(13, 39, localDateTime("1970-01-01 00:00:12.0")),
+        row(15, 45, localDateTime("1970-01-01 00:00:14.0")),
+        row(17, 51, localDateTime("1970-01-01 00:00:16.0")),
+        row(19, 57, localDateTime("1970-01-01 00:00:18.0")),
+        row(20, 41, localDateTime("1970-01-01 00:00:20.0")),
+        row(3, 9, localDateTime("1970-01-01 00:00:02.0")),
+        row(5, 15, localDateTime("1970-01-01 00:00:04.0")),
+        row(7, 21, localDateTime("1970-01-01 00:00:06.0")),
+        row(9, 27, localDateTime("1970-01-01 00:00:08.0"))
       )
     )
 
@@ -335,17 +335,17 @@ class WindowAggregateITCase extends BatchTestBase {
           "FROM Table3WithTimestamp " +
           "GROUP BY HOP(ts, INTERVAL '2' SECOND, INTERVAL '3' SECOND)",
       Seq(
-        row(1.5, 3, localDateTime("1970-01-01 00:00:00.0")),
-        row(11.0, 33, localDateTime("1970-01-01 00:00:10.0")),
-        row(13.0, 39, localDateTime("1970-01-01 00:00:12.0")),
-        row(15.0, 45, localDateTime("1970-01-01 00:00:14.0")),
-        row(17.0, 51, localDateTime("1970-01-01 00:00:16.0")),
-        row(19.0, 57, localDateTime("1970-01-01 00:00:18.0")),
-        row(20.5, 41, localDateTime("1970-01-01 00:00:20.0")),
-        row(3.0, 9, localDateTime("1970-01-01 00:00:02.0")),
-        row(5.0, 15, localDateTime("1970-01-01 00:00:04.0")),
-        row(7.0, 21, localDateTime("1970-01-01 00:00:06.0")),
-        row(9.0, 27, localDateTime("1970-01-01 00:00:08.0"))
+        row(1, 3, localDateTime("1970-01-01 00:00:00.0")),
+        row(11, 33, localDateTime("1970-01-01 00:00:10.0")),
+        row(13, 39, localDateTime("1970-01-01 00:00:12.0")),
+        row(15, 45, localDateTime("1970-01-01 00:00:14.0")),
+        row(17, 51, localDateTime("1970-01-01 00:00:16.0")),
+        row(19, 57, localDateTime("1970-01-01 00:00:18.0")),
+        row(20, 41, localDateTime("1970-01-01 00:00:20.0")),
+        row(3, 9, localDateTime("1970-01-01 00:00:02.0")),
+        row(5, 15, localDateTime("1970-01-01 00:00:04.0")),
+        row(7, 21, localDateTime("1970-01-01 00:00:06.0")),
+        row(9, 27, localDateTime("1970-01-01 00:00:08.0"))
       )
     )
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/AggregationITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/AggregationITCase.scala
index 0d2ed05..8ef2947 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/AggregationITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/AggregationITCase.scala
@@ -47,7 +47,7 @@ class AggregationITCase extends BatchTestBase {
       .where('a.get("_1") > 0)
       .select('a.get("_1").avg, 'a.get("_2").sum, 'b.count)
 
-    val expected = "2.0,6,3"
+    val expected = "2,6,3"
     val results = executeQuery(result)
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
@@ -59,7 +59,7 @@ class AggregationITCase extends BatchTestBase {
       .select('_1.sum, '_1.sum0, '_1.min, '_1.max, '_1.count, '_1.avg)
 
     val results = executeQuery(t)
-    val expected = "231,231,1,21,21,11.0"
+    val expected = "231,231,1,21,21,11"
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -71,7 +71,7 @@ class AggregationITCase extends BatchTestBase {
       (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao"))
       .select('_1.avg, '_2.avg, '_3.avg, '_4.avg, '_5.avg, '_6.avg, '_7.count)
 
-    val expected = "1.5,1.5,1.5,1.5,1.5,1.5,2"
+    val expected = "1,1,1,1,1.5,1.5,2"
     val results = executeQuery(t)
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
@@ -83,7 +83,7 @@ class AggregationITCase extends BatchTestBase {
       (2: Byte, 2: Short))
       .select('_1.avg, '_1.sum, '_1.count, '_2.avg, '_2.sum)
 
-    val expected = "1.5,3,2,1.5,3"
+    val expected = "1,3,2,1,3"
     val results = executeQuery(t)
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
@@ -116,7 +116,7 @@ class AggregationITCase extends BatchTestBase {
       .select('_1, '_2, '_3)
       .select('_1.avg, '_2.sum, '_3.count)
 
-    val expected = "1.5,3,2"
+    val expected = "1,3,2"
     val result = executeQuery(t)
     TestBaseUtils.compareResultAsText(result.asJava, expected)
   }
@@ -132,7 +132,7 @@ class AggregationITCase extends BatchTestBase {
           |Count(a) as e1, a.count as e2
         """.stripMargin)
 
-    val expected = "231,231,1,1,21,21,11.0,11.0,21,21"
+    val expected = "231,231,1,1,21,21,11,11,21,21"
     val results = executeQuery(t)
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
@@ -297,9 +297,9 @@ class AggregationITCase extends BatchTestBase {
       .select('c.min, 'e, 'a.avg, 'd.count)
 
     val expected = Seq(
-      s"0,1,${1.0 / 1},1", s"7,1,${9.0 / 2},2", s"2,1,${6.0 / 2},2",
-      s"3,2,${11.0 / 3},3", s"1,2,${10.0 / 3},3", s"14,2,${5.0 / 1},1",
-      s"12,3,${5.0 / 1},1", s"5,3,${8.0 / 2},2").mkString("\n")
+      s"0,1,${1 / 1},1", s"7,1,${9 / 2},2", s"2,1,${6 / 2},2",
+      s"3,2,${11 / 3},3", s"1,2,${10 / 3},3", s"14,2,${5 / 1},1",
+      s"12,3,${5 / 1},1", s"5,3,${8 / 2},2").mkString("\n")
     val results = executeQuery(t)
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
index 7d8796d..73b3a9b 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
@@ -100,9 +100,9 @@ class AggregateITCase(
     t1.toRetractStream[Row].addSink(sink)
     env.execute()
     val expected = List(
-      "1,1.0,1,1,1",
-      "2,2.0,2,1,1",
-      "3,3.0,3,1,1")
+      "1,1,1,1,1",
+      "2,2,2,1,1",
+      "3,3,3,1,1")
     assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
 
@@ -145,7 +145,7 @@ class AggregateITCase(
     val sink = new TestingRetractSink
     t1.toRetractStream[Row].addSink(sink).setParallelism(1)
     env.execute()
-    val expected = List("6,2.0,1,3,3")
+    val expected = List("6,2,1,3,3")
     assertEquals(expected, sink.getRetractResults)
   }
 
@@ -261,7 +261,7 @@ class AggregateITCase(
     t1.toRetractStream[Row].addSink(sink).setParallelism(1)
     env.execute()
 
-    val expected = List("3,9,4,2,3.0,5")
+    val expected = List("3,9,4,2,3,5")
     assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
 
@@ -1021,7 +1021,7 @@ class AggregateITCase(
     tEnv.sqlQuery(sqlQuery).toRetractStream[Row].addSink(sink)
     env.execute()
     // TODO: define precise behavior of VAR_POP()
-    val expected = List(15602500d.toString, 28888.8888888893d.toString)
+    val expected = List(15602500.toString, 28889.toString)
     assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/MatchRecognizeITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/MatchRecognizeITCase.scala
index 228b644..f563de5 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/MatchRecognizeITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/MatchRecognizeITCase.scala
@@ -584,7 +584,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState
     result.addSink(sink)
     env.execute()
 
-    val expected = mutable.MutableList("1,5,0,null,2,3.0,3.4,8", "9,4,0,null,3,4.0,3.2,12")
+    val expected = mutable.MutableList("1,5,0,null,2,3,3.4,8", "9,4,0,null,3,4,3.2,12")
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverWindowITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverWindowITCase.scala
index 0c11794..4a234f4 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverWindowITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverWindowITCase.scala
@@ -594,19 +594,19 @@ class OverWindowITCase(mode: StateBackendMode) extends StreamingWithStateTestBas
     env.execute()
 
     val expected = List(
-      s"1,1,Hello,0,6,3,${6.0/3},3,1",
-      s"1,2,Hello,0,6,3,${6.0/3},3,1",
-      s"1,3,Hello world,0,6,3,${6.0/3},3,1",
-      s"1,1,Hi,0,7,4,${7.0/4},3,1",
-      s"2,1,Hello,0,1,1,${1.0/1},1,1",
-      s"2,2,Hello world,0,6,3,${6.0/3},3,1",
-      s"2,3,Hello world,0,6,3,${6.0/3},3,1",
-      s"1,4,Hello world,0,11,5,${11.0/5},4,1",
-      s"1,5,Hello world,3,29,8,${29.0/8},7,1",
-      s"1,6,Hello world,3,29,8,${29.0/8},7,1",
-      s"1,7,Hello world,3,29,8,${29.0/8},7,1",
-      s"2,4,Hello world,1,15,5,${15.0/5},5,1",
-      s"2,5,Hello world,1,15,5,${15.0/5},5,1")
+      s"1,1,Hello,0,6,3,${6/3},3,1",
+      s"1,2,Hello,0,6,3,${6/3},3,1",
+      s"1,3,Hello world,0,6,3,${6/3},3,1",
+      s"1,1,Hi,0,7,4,${7/4},3,1",
+      s"2,1,Hello,0,1,1,${1/1},1,1",
+      s"2,2,Hello world,0,6,3,${6/3},3,1",
+      s"2,3,Hello world,0,6,3,${6/3},3,1",
+      s"1,4,Hello world,0,11,5,${11/5},4,1",
+      s"1,5,Hello world,3,29,8,${29/8},7,1",
+      s"1,6,Hello world,3,29,8,${29/8},7,1",
+      s"1,7,Hello world,3,29,8,${29/8},7,1",
+      s"2,4,Hello world,1,15,5,${15/5},5,1",
+      s"2,5,Hello world,1,15,5,${15/5},5,1")
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }
 
@@ -659,20 +659,20 @@ class OverWindowITCase(mode: StateBackendMode) extends StreamingWithStateTestBas
     env.execute()
 
     val expected = mutable.MutableList(
-      s"1,2,Hello,0,2,1,${2.0/1},2,2",
-      s"1,3,Hello world,0,5,2,${5.0/2},3,2",
-      s"1,1,Hi,0,6,3,${6.0/3},3,1",
-      s"2,1,Hello,0,1,1,${1.0/1},1,1",
-      s"2,2,Hello world,0,3,2,${3.0/2},2,1",
-      s"3,1,Hello,0,1,1,${1.0/1},1,1",
-      s"3,2,Hello world,0,3,2,${3.0/2},2,1",
-      s"1,5,Hello world,1,11,4,${11.0/4},5,1",
-      s"1,6,Hello world,2,17,5,${17.0/5},6,1",
-      s"1,9,Hello world,3,26,6,${26.0/6},9,1",
-      s"1,8,Hello world,4,34,7,${34.0/7},9,1",
-      s"1,7,Hello world,5,41,8,${41.0/8},9,1",
-      s"2,5,Hello world,1,8,3,${8.0/3},5,1",
-      s"3,5,Hello world,1,8,3,${8.0/3},5,1")
+      s"1,2,Hello,0,2,1,${2/1},2,2",
+      s"1,3,Hello world,0,5,2,${5/2},3,2",
+      s"1,1,Hi,0,6,3,${6/3},3,1",
+      s"2,1,Hello,0,1,1,${1/1},1,1",
+      s"2,2,Hello world,0,3,2,${3/2},2,1",
+      s"3,1,Hello,0,1,1,${1/1},1,1",
+      s"3,2,Hello world,0,3,2,${3/2},2,1",
+      s"1,5,Hello world,1,11,4,${11/4},5,1",
+      s"1,6,Hello world,2,17,5,${17/5},6,1",
+      s"1,9,Hello world,3,26,6,${26/6},9,1",
+      s"1,8,Hello world,4,34,7,${34/7},9,1",
+      s"1,7,Hello world,5,41,8,${41/8},9,1",
+      s"2,5,Hello world,1,8,3,${8/3},5,1",
+      s"3,5,Hello world,1,8,3,${8/3},5,1")
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }
 
@@ -715,19 +715,19 @@ class OverWindowITCase(mode: StateBackendMode) extends StreamingWithStateTestBas
     env.execute()
 
     val expected = List(
-      s"2,1,Hello,1,1,${1.0/1},1,1",
-      s"1,1,Hello,7,4,${7.0/4},3,1",
-      s"1,2,Hello,7,4,${7.0/4},3,1",
-      s"1,3,Hello world,7,4,${7.0/4},3,1",
-      s"2,2,Hello world,12,6,${12.0/6},3,1",
-      s"2,3,Hello world,12,6,${12.0/6},3,1",
-      s"1,1,Hi,13,7,${13.0/7},3,1",
-      s"1,4,Hello world,17,8,${17.0/8},4,1",
-      s"1,5,Hello world,35,11,${35.0/11},7,1",
-      s"1,6,Hello world,35,11,${35.0/11},7,1",
-      s"1,7,Hello world,35,11,${35.0/11},7,1",
-      s"2,4,Hello world,44,13,${44.0/13},7,1",
-      s"2,5,Hello world,44,13,${44.0/13},7,1")
+      s"2,1,Hello,1,1,${1/1},1,1",
+      s"1,1,Hello,7,4,${7/4},3,1",
+      s"1,2,Hello,7,4,${7/4},3,1",
+      s"1,3,Hello world,7,4,${7/4},3,1",
+      s"2,2,Hello world,12,6,${12/6},3,1",
+      s"2,3,Hello world,12,6,${12/6},3,1",
+      s"1,1,Hi,13,7,${13/7},3,1",
+      s"1,4,Hello world,17,8,${17/8},4,1",
+      s"1,5,Hello world,35,11,${35/11},7,1",
+      s"1,6,Hello world,35,11,${35/11},7,1",
+      s"1,7,Hello world,35,11,${35/11},7,1",
+      s"2,4,Hello world,44,13,${44/13},7,1",
+      s"2,5,Hello world,44,13,${44/13},7,1")
 
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }
@@ -770,14 +770,14 @@ class OverWindowITCase(mode: StateBackendMode) extends StreamingWithStateTestBas
     env.execute()
 
     val expected = mutable.MutableList(
-      s"2,2,Hello,2,1,${2.0/1},2,2",
-      s"3,5,Hello,7,2,${7.0/2},5,2",
-      s"1,3,Hello,10,3,${10.0/3},5,2",
-      s"3,7,Hello world,17,4,${17.0/4},7,2",
-      s"1,1,Hi,18,5,${18.0/5},7,1",
-      s"4,9,Hello world,27,6,${27.0/6},9,1",
-      s"5,8,Hello world,35,7,${35.0/7},9,1",
-      s"6,8,Hello world,43,8,${43.0/8},9,1")
+      s"2,2,Hello,2,1,${2/1},2,2",
+      s"3,5,Hello,7,2,${7/2},5,2",
+      s"1,3,Hello,10,3,${10/3},5,2",
+      s"3,7,Hello world,17,4,${17/4},7,2",
+      s"1,1,Hi,18,5,${18/5},7,1",
+      s"4,9,Hello world,27,6,${27/6},9,1",
+      s"5,8,Hello world,35,7,${35/7},9,1",
+      s"6,8,Hello world,43,8,${43/8},9,1")
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }
 
@@ -837,20 +837,20 @@ class OverWindowITCase(mode: StateBackendMode) extends StreamingWithStateTestBas
     env.execute()
 
     val expected = List(
-      s"1,2,Hello,2,1,${2.0/1},2,2",
-      s"1,3,Hello world,5,2,${5.0/2},3,2",
-      s"1,1,Hi,6,3,${6.0/3},3,1",
-      s"2,1,Hello,1,1,${1.0/1},1,1",
-      s"2,2,Hello world,3,2,${3.0/2},2,1",
-      s"3,1,Hello,1,1,${1.0/1},1,1",
-      s"3,2,Hello world,3,2,${3.0/2},2,1",
-      s"1,5,Hello world,11,4,${11.0/4},5,1",
-      s"1,6,Hello world,17,5,${17.0/5},6,1",
-      s"1,9,Hello world,26,6,${26.0/6},9,1",
-      s"1,8,Hello world,34,7,${34.0/7},9,1",
-      s"1,7,Hello world,41,8,${41.0/8},9,1",
-      s"2,5,Hello world,8,3,${8.0/3},5,1",
-      s"3,5,Hello world,8,3,${8.0/3},5,1"
+      s"1,2,Hello,2,1,${2/1},2,2",
+      s"1,3,Hello world,5,2,${5/2},3,2",
+      s"1,1,Hi,6,3,${6/3},3,1",
+      s"2,1,Hello,1,1,${1/1},1,1",
+      s"2,2,Hello world,3,2,${3/2},2,1",
+      s"3,1,Hello,1,1,${1/1},1,1",
+      s"3,2,Hello world,3,2,${3/2},2,1",
+      s"1,5,Hello world,11,4,${11/4},5,1",
+      s"1,6,Hello world,17,5,${17/5},6,1",
+      s"1,9,Hello world,26,6,${26/6},9,1",
+      s"1,8,Hello world,34,7,${34/7},9,1",
+      s"1,7,Hello world,41,8,${41/8},9,1",
+      s"2,5,Hello world,8,3,${8/3},5,1",
+      s"3,5,Hello world,8,3,${8/3},5,1"
     )
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/RankITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/RankITCase.scala
index ceeeed3..a422c06 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/RankITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/RankITCase.scala
@@ -1149,10 +1149,10 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode
     env.execute()
 
     val expected1 = List(
-      "book,1,25,12.5,1",
-      "book,2,19,19.0,2",
-      "fruit,3,44,44.0,1",
-      "fruit,4,33,33.0,2")
+      "book,1,25,12,1",
+      "book,2,19,19,2",
+      "fruit,3,44,44,1",
+      "fruit,4,33,33,2")
     assertEquals(expected1.sorted, sink1.getRetractResults.sorted)
 
     val expected2 = List(
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
index 6f7df33..a5f35f5 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
@@ -111,8 +111,8 @@ class SplitAggregateITCase(
     t1.toRetractStream[Row].addSink(sink)
     env.execute()
 
-    val expected = List("1,3,2,1.5", "2,29,5,3.625",
-      "3,10,2,5.0", "4,21,3,5.25")
+    val expected = List("1,3,2,1", "2,29,5,3",
+      "3,10,2,5", "4,21,3,5")
     assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala
index f2c29c6..758f653 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala
@@ -231,9 +231,9 @@ class AggregateITCase(mode: StateBackendMode) extends StreamingWithStateTestBase
     env.execute()
 
     val expected = mutable.MutableList(
-      s"0,1,${1.0/1},1", s"7,1,${9.0/2},2", s"2,1,${6.0/2},2",
-      s"3,2,${11.0/3},3", s"1,2,${10.0/3},3", s"14,2,${5.0/1},1",
-      s"12,3,${5.0/1},1", s"5,3,${8.0/2},2")
+      s"0,1,1,1", s"7,1,4,2", s"2,1,3,2",
+      s"3,2,3,3", s"1,2,3,3", s"14,2,5,1",
+      s"12,3,5,1", s"5,3,4,2")
     assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/GroupWindowITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/GroupWindowITCase.scala
index 35d64ae..031f1d6 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/GroupWindowITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/GroupWindowITCase.scala
@@ -82,7 +82,7 @@ class GroupWindowITCase(mode: StateBackendMode)
     windowedTable.toAppendStream[Row].addSink(sink)
     env.execute()
 
-    val expected = Seq(s"Hello world,2,${6.0/2},12,3,2", s"Hello,2,${4.0/2},3,2,2")
+    val expected = Seq(s"Hello world,2,3,12,3,2", s"Hello,2,2,3,2,2")
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }
 
@@ -119,9 +119,9 @@ class GroupWindowITCase(mode: StateBackendMode)
     env.execute()
 
     val expected = Seq(
-      "Hello World,1970-01-01T00:00:00.014,1,9.0,9,9,1",
-      "Hello,1970-01-01T00:00:00.021,1,16.0,16,16,1",
-      s"Hello,1970-01-01T00:00:00.013,4,${15.0/4},5,5,4")
+      "Hello World,1970-01-01T00:00:00.014,1,9,9,9,1",
+      "Hello,1970-01-01T00:00:00.021,1,16,16,16,1",
+      s"Hello,1970-01-01T00:00:00.013,4,3,5,5,4")
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }
 
@@ -146,7 +146,7 @@ class GroupWindowITCase(mode: StateBackendMode)
     windowedTable.toAppendStream[Row].addSink(sink)
     env.execute()
 
-    val expected = Seq(s"2,${3.0/2},1,1,2", s"2,${5.0/2},6,2,2")
+    val expected = Seq(s"2,1,1,1,2", s"2,2,6,2,2")
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/MiniBatchGroupWindowITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/MiniBatchGroupWindowITCase.scala
index cb66334..3ade387 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/MiniBatchGroupWindowITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/MiniBatchGroupWindowITCase.scala
@@ -86,10 +86,10 @@ class MiniBatchGroupWindowITCase(miniBatch: MiniBatchMode, mode: StateBackendMod
     env.execute()
 
     val expected = Seq(
-      "Hello world,1,3.0,16,3,3,3,3,1970-01-01T00:00:00.015,1970-01-01T00:00:00.020,1",
-      "Hello world,1,3.0,8,3,3,3,3,1970-01-01T00:00:00.005,1970-01-01T00:00:00.010,1",
-      s"Hello,2,${4.0/2},3,2,2,2,4,1970-01-01T00:00,1970-01-01T00:00:00.005,2",
-      "Hi,1,1.0,1,1,1,1,1,1970-01-01T00:00,1970-01-01T00:00:00.005,1")
+      "Hello world,1,3,16,3,3,3,3,1970-01-01T00:00:00.015,1970-01-01T00:00:00.020,1",
+      "Hello world,1,3,8,3,3,3,3,1970-01-01T00:00:00.005,1970-01-01T00:00:00.010,1",
+      s"Hello,2,2,3,2,2,2,4,1970-01-01T00:00,1970-01-01T00:00:00.005,2",
+      "Hi,1,1,1,1,1,1,1,1970-01-01T00:00,1970-01-01T00:00:00.005,1")
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/OverWindowITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/OverWindowITCase.scala
index 141a4d2..7fc3692 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/OverWindowITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/OverWindowITCase.scala
@@ -154,7 +154,7 @@ class OverWindowITCase(mode: StateBackendMode) extends StreamingWithStateTestBas
         countFun('b) over 'w,
         (countFun('b) over 'w) + 1,
         plusOne(countFun('b) over 'w),
-        array('b.avg over 'w, 'b.cast(DataTypes.DOUBLE()).max over 'w),
+        array('b.avg over 'w, 'b.max over 'w),
         'b.avg over 'w,
         'b.max over 'w,
         'b.min over 'w,
@@ -167,19 +167,19 @@ class OverWindowITCase(mode: StateBackendMode) extends StreamingWithStateTestBas
     env.execute()
 
     val expected = mutable.MutableList(
-      "1,1,Hello,6,SUM:6,3,4,4,[2.0, 3.0],2.0,3,1,1,2,2",
-      "1,2,Hello,6,SUM:6,3,4,4,[2.0, 3.0],2.0,3,1,1,2,2",
-      "1,3,Hello world,6,SUM:6,3,4,4,[2.0, 3.0],2.0,3,1,1,2,2",
-      "1,1,Hi,7,SUM:7,4,5,5,[1.75, 3.0],1.75,3,1,1,1,3",
-      "2,1,Hello,1,SUM:1,1,2,2,[1.0, 1.0],1.0,1,1,1,1,1",
-      "2,2,Hello world,6,SUM:6,3,4,4,[2.0, 3.0],2.0,3,1,1,2,2",
-      "2,3,Hello world,6,SUM:6,3,4,4,[2.0, 3.0],2.0,3,1,1,2,2",
-      "1,4,Hello world,11,SUM:11,5,6,6,[2.2, 4.0],2.2,4,1,1,2,3",
-      "1,5,Hello world,29,SUM:29,8,9,9,[3.625, 7.0],3.625,7,1,1,3,3",
-      "1,6,Hello world,29,SUM:29,8,9,9,[3.625, 7.0],3.625,7,1,1,3,3",
-      "1,7,Hello world,29,SUM:29,8,9,9,[3.625, 7.0],3.625,7,1,1,3,3",
-      "2,4,Hello world,15,SUM:15,5,6,6,[3.0, 5.0],3.0,5,1,1,3,2",
-      "2,5,Hello world,15,SUM:15,5,6,6,[3.0, 5.0],3.0,5,1,1,3,2"
+      "1,1,Hello,6,SUM:6,3,4,4,[2, 3],2,3,1,1,2,2",
+      "1,2,Hello,6,SUM:6,3,4,4,[2, 3],2,3,1,1,2,2",
+      "1,3,Hello world,6,SUM:6,3,4,4,[2, 3],2,3,1,1,2,2",
+      "1,1,Hi,7,SUM:7,4,5,5,[1, 3],1,3,1,1,1,3",
+      "2,1,Hello,1,SUM:1,1,2,2,[1, 1],1,1,1,1,1,1",
+      "2,2,Hello world,6,SUM:6,3,4,4,[2, 3],2,3,1,1,2,2",
+      "2,3,Hello world,6,SUM:6,3,4,4,[2, 3],2,3,1,1,2,2",
+      "1,4,Hello world,11,SUM:11,5,6,6,[2, 4],2,4,1,1,2,3",
+      "1,5,Hello world,29,SUM:29,8,9,9,[3, 7],3,7,1,1,3,3",
+      "1,6,Hello world,29,SUM:29,8,9,9,[3, 7],3,7,1,1,3,3",
+      "1,7,Hello world,29,SUM:29,8,9,9,[3, 7],3,7,1,1,3,3",
+      "2,4,Hello world,15,SUM:15,5,6,6,[3, 5],3,5,1,1,3,2",
+      "2,5,Hello world,15,SUM:15,5,6,6,[3, 5],3,5,1,1,3,2"
     )
 
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
@@ -431,8 +431,8 @@ class OverWindowITCase(mode: StateBackendMode) extends StreamingWithStateTestBas
     env.execute()
 
     val expected = mutable.MutableList(
-      "Hello World,20,2,2.0", "Hello World,7,1,1.0", "Hello,1,1,1.0",
-      "Hello,2,2,2.0", "Hello,6,3,3.0")
+      "Hello World,20,2,2", "Hello World,7,1,1", "Hello,1,1,1",
+      "Hello,2,2,2", "Hello,6,3,3")
 
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/AvgAggFunction.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/AvgAggFunction.scala
index fc88cff..89dc893 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/AvgAggFunction.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/AvgAggFunction.scala
@@ -69,7 +69,7 @@ abstract class IntegralAvgAggFunction[T] extends AggregateFunction[T, IntegralAv
     if (acc.f1 == 0) {
       null.asInstanceOf[T]
     } else {
-      resultTypeConvert(acc.f0.doubleValue() / acc.f1)
+      resultTypeConvert(acc.f0 / acc.f1)
     }
   }
 
@@ -98,28 +98,28 @@ abstract class IntegralAvgAggFunction[T] extends AggregateFunction[T, IntegralAv
     *              the intermediate result to avoid the overflow by sum operation.
     * @return the result value with the expected aggregation result type
     */
-  def resultTypeConvert(value: Double): T
+  def resultTypeConvert(value: Long): T
 }
 
 /**
   * Built-in Byte Avg aggregate function
   */
-class ByteAvgAggFunction extends IntegralAvgAggFunction[Double] {
-  override def resultTypeConvert(value: Double): Double = value
+class ByteAvgAggFunction extends IntegralAvgAggFunction[Byte] {
+  override def resultTypeConvert(value: Long): Byte = value.toByte
 }
 
 /**
   * Built-in Short Avg aggregate function
   */
-class ShortAvgAggFunction extends IntegralAvgAggFunction[Double] {
-  override def resultTypeConvert(value: Double): Double = value
+class ShortAvgAggFunction extends IntegralAvgAggFunction[Short] {
+  override def resultTypeConvert(value: Long): Short = value.toShort
 }
 
 /**
   * Built-in Int Avg aggregate function
   */
-class IntAvgAggFunction extends IntegralAvgAggFunction[Double] {
-  override def resultTypeConvert(value: Double): Double = value
+class IntAvgAggFunction extends IntegralAvgAggFunction[Int] {
+  override def resultTypeConvert(value: Long): Int = value.toInt
 }
 
 /** The initial accumulator for Big Integral Avg aggregate function */
@@ -161,7 +161,7 @@ abstract class BigIntegralAvgAggFunction[T]
     if (acc.f1 == 0) {
       null.asInstanceOf[T]
     } else {
-      resultTypeConvert(acc.f0.doubleValue() / acc.f1)
+      resultTypeConvert(acc.f0.divide(BigInteger.valueOf(acc.f1)))
     }
   }
 
@@ -191,14 +191,14 @@ abstract class BigIntegralAvgAggFunction[T]
     *              operation.
     * @return the result value with the expected aggregation result type
     */
-  def resultTypeConvert(value: Double): T
+  def resultTypeConvert(value: BigInteger): T
 }
 
 /**
   * Built-in Long Avg aggregate function
   */
-class LongAvgAggFunction extends BigIntegralAvgAggFunction[Double] {
-  override def resultTypeConvert(value: Double): Double = value
+class LongAvgAggFunction extends BigIntegralAvgAggFunction[Long] {
+  override def resultTypeConvert(value: BigInteger): Long = value.longValue()
 }
 
 /** The initial accumulator for Floating Avg aggregate function */