You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/07/23 04:44:53 UTC

[flink] branch master updated: [FLINK-27618][sql] Flink SQL supports CUME_DIST function (#19727)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8017a417515 [FLINK-27618][sql] Flink SQL supports CUME_DIST function (#19727)
8017a417515 is described below

commit 8017a417515cb87b1d15ef1295ecac73126b40f8
Author: yuxia Luo <lu...@alumni.sjtu.edu.cn>
AuthorDate: Sat Jul 23 12:44:46 2022 +0800

    [FLINK-27618][sql] Flink SQL supports CUME_DIST function (#19727)
---
 docs/data/sql_functions.yml                        |  2 +
 docs/data/sql_functions_zh.yml                     |  2 +
 .../aggfunctions/CumeDistAggFunction.java          | 86 ++++++++++++++++++++++
 .../aggfunctions/SizeBasedWindowFunction.java}     | 27 ++++---
 .../functions/sql/FlinkSqlOperatorTable.java       |  1 +
 .../nodes/exec/batch/BatchExecOverAggregate.java   | 32 +++++---
 .../exec/batch/BatchExecOverAggregateBase.java     | 19 ++++-
 .../table/planner/codegen/agg/AggCodeGen.scala     |  2 +
 .../codegen/agg/AggsHandlerCodeGenerator.scala     | 44 ++++++++++-
 .../codegen/agg/DeclarativeAggCodeGen.scala        | 19 ++++-
 .../planner/codegen/agg/DistinctAggCodeGen.scala   |  5 ++
 .../planner/codegen/agg/ImperativeAggCodeGen.scala |  6 ++
 .../planner/plan/utils/AggFunctionFactory.scala    | 11 +++
 .../runtime/batch/sql/OverAggregateITCase.scala    | 50 ++++++++++++-
 .../runtime/generated/AggsHandleFunction.java      | 12 +++
 .../operators/over/frame/InsensitiveOverFrame.java |  1 +
 .../operators/over/frame/OffsetOverFrame.java      |  1 +
 .../operators/over/frame/SlidingOverFrame.java     |  1 +
 .../over/frame/UnboundedFollowingOverFrame.java    |  1 +
 .../over/frame/UnboundedOverWindowFrame.java       |  1 +
 .../over/frame/UnboundedPrecedingOverFrame.java    |  1 +
 .../operators/over/SumAggsHandleFunction.java      |  3 +
 22 files changed, 298 insertions(+), 29 deletions(-)

diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index 9589f76edec..bc2a13dd82a 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -1005,6 +1005,8 @@ aggregate:
     description: Returns the last value in an ordered set of values.
   - sql: LISTAGG(expression [, separator])
     description: Concatenates the values of string expressions and places separator values between them. The separator is not added at the end of string. The default value of separator is ','.
+  - sql: CUME_DIST()
+    description: Return the cumulative distribution of a value in a group of values. The result is the number of rows preceding or equal to the current row in the ordering of the partition divided by the number of rows in the window partition.
 
 catalog:
   - sql: CURRENT_DATABASE()
diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml
index 2f63d985857..85baa7ebd50 100644
--- a/docs/data/sql_functions_zh.yml
+++ b/docs/data/sql_functions_zh.yml
@@ -1096,6 +1096,8 @@ aggregate:
     description: 返回一组有序值中的最后一个值。
   - sql: LISTAGG(expression [, separator])
     description: 连接字符串表达式的值并在它们之间放置分隔符值。字符串末尾不添加分隔符时则分隔符的默认值为“,”。
+  - sql: CUME_DIST()
+    description: 返回值在一组值的累积分布。结果是小于或等于当前行的值的行数除以窗口分区的总行数。
 
 catalog:
   - sql: CURRENT_DATABASE()
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/CumeDistAggFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/CumeDistAggFunction.java
new file mode 100644
index 00000000000..bc864dde921
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/CumeDistAggFunction.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.aggfunctions;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.table.types.DataType;
+
+import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
+import static org.apache.flink.table.planner.expressions.ExpressionBuilder.cast;
+import static org.apache.flink.table.planner.expressions.ExpressionBuilder.div;
+import static org.apache.flink.table.planner.expressions.ExpressionBuilder.literal;
+import static org.apache.flink.table.planner.expressions.ExpressionBuilder.plus;
+import static org.apache.flink.table.planner.expressions.ExpressionBuilder.typeLiteral;
+
+/** built-in CUME_DIST agg function. */
+public class CumeDistAggFunction extends DeclarativeAggregateFunction
+        implements SizeBasedWindowFunction {
+
+    private final UnresolvedReferenceExpression sequence = unresolvedRef("seq");
+
+    @Override
+    public int operandCount() {
+        return 0;
+    }
+
+    @Override
+    public UnresolvedReferenceExpression[] aggBufferAttributes() {
+        return new UnresolvedReferenceExpression[] {sequence};
+    }
+
+    @Override
+    public DataType[] getAggBufferTypes() {
+        return new DataType[] {DataTypes.INT()};
+    }
+
+    @Override
+    public DataType getResultType() {
+        return DataTypes.DOUBLE();
+    }
+
+    @Override
+    public Expression[] initialValuesExpressions() {
+        return new Expression[] {literal(0, DataTypes.INT())};
+    }
+
+    @Override
+    public Expression[] accumulateExpressions() {
+        return new Expression[] {plus(sequence, literal(1, DataTypes.INT()))};
+    }
+
+    @Override
+    public Expression[] retractExpressions() {
+        throw new TableException("This function does not support retraction.");
+    }
+
+    @Override
+    public Expression[] mergeExpressions() {
+        throw new TableException("This function does not support merge.");
+    }
+
+    @Override
+    public Expression getValueExpression() {
+        return div(
+                cast(sequence, typeLiteral(DataTypes.DOUBLE())),
+                cast(windowSizeAttribute(), typeLiteral(DataTypes.DOUBLE())));
+    }
+}
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/AggsHandleFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SizeBasedWindowFunction.java
similarity index 52%
copy from flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/AggsHandleFunction.java
copy to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SizeBasedWindowFunction.java
index 43339d8875a..b2fabb478fd 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/AggsHandleFunction.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SizeBasedWindowFunction.java
@@ -16,24 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.runtime.generated;
+package org.apache.flink.table.planner.functions.aggfunctions;
 
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.expressions.LocalReferenceExpression;
+
+import static org.apache.flink.table.expressions.ApiExpressionUtils.localRef;
 
 /**
- * The base class for handling aggregate functions.
- *
- * <p>It is code generated to handle all {@link AggregateFunction}s together in an aggregation.
+ * Some functions like CUME_DIST/PERCENT_RANK/NTILE need the size of current window for calculation.
+ * Such function need to implement the interface to provide accessing to the window size.
  *
- * <p>It is the entry point for aggregate operators to operate all {@link AggregateFunction}s.
+ * <p>NOTE: Now, it can only be used by {@link DeclarativeAggregateFunction}.
  */
-public interface AggsHandleFunction extends AggsHandleFunctionBase {
+public interface SizeBasedWindowFunction {
 
-    /**
-     * Gets the result of the aggregation from the current accumulators.
-     *
-     * @return the final result (saved in a row) of the current accumulators.
-     */
-    RowData getValue() throws Exception;
+    /** The field for the window size. */
+    default LocalReferenceExpression windowSizeAttribute() {
+        return localRef("window_size", DataTypes.INT());
+    }
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index f85ad65f2a2..4051f6e9181 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -1138,6 +1138,7 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
     public static final SqlAggFunction RANK = SqlStdOperatorTable.RANK;
     public static final SqlAggFunction DENSE_RANK = SqlStdOperatorTable.DENSE_RANK;
     public static final SqlAggFunction ROW_NUMBER = SqlStdOperatorTable.ROW_NUMBER;
+    public static final SqlAggFunction CUME_DIST = SqlStdOperatorTable.CUME_DIST;
     public static final SqlAggFunction LEAD = SqlStdOperatorTable.LEAD;
     public static final SqlAggFunction LAG = SqlStdOperatorTable.LAG;
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecOverAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecOverAggregate.java
index 4fc5786fb48..6060a07e2c0 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecOverAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecOverAggregate.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.planner.codegen.over.MultiFieldRangeBoundComparato
 import org.apache.flink.table.planner.codegen.over.RangeBoundComparatorCodeGenerator;
 import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator;
 import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.functions.aggfunctions.SizeBasedWindowFunction;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
@@ -70,6 +71,7 @@ import org.apache.calcite.rex.RexWindowBound;
 import org.apache.calcite.sql.SqlKind;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
@@ -290,6 +292,10 @@ public class BatchExecOverAggregate extends BatchExecOverAggregateBase {
                                 relBuilder,
                                 JavaScalaConversionUtil.toScala(inputType.getChildren()),
                                 false); // copyInputField
+                if (Arrays.stream(aggInfoList.aggInfos())
+                        .anyMatch(f -> f.function() instanceof SizeBasedWindowFunction)) {
+                    generator.needWindowSize();
+                }
                 // over agg code gen must pass the constants
                 GeneratedAggsHandleFunction genAggsHandler =
                         generator
@@ -436,17 +442,21 @@ public class BatchExecOverAggregate extends BatchExecOverAggregateBase {
         return overSpec.getGroups().stream()
                 .anyMatch(
                         group -> {
-                            OverWindowMode mode = inferGroupMode(group);
-                            switch (mode) {
-                                case INSENSITIVE:
-                                    return false;
-                                case ROW:
-                                    return (!group.getLowerBound().isCurrentRow()
-                                                    || !group.getUpperBound().isCurrentRow())
-                                            && (!group.getLowerBound().isUnbounded()
-                                                    || !group.getUpperBound().isCurrentRow());
-                                default:
-                                    return true;
+                            if (containSizeBasedWindowFunction(group)) {
+                                return true;
+                            } else {
+                                OverWindowMode mode = inferGroupMode(group);
+                                switch (mode) {
+                                    case INSENSITIVE:
+                                        return false;
+                                    case ROW:
+                                        return (!group.getLowerBound().isCurrentRow()
+                                                        || !group.getUpperBound().isCurrentRow())
+                                                && (!group.getLowerBound().isUnbounded()
+                                                        || !group.getUpperBound().isCurrentRow());
+                                    default:
+                                        return true;
+                                }
                             }
                         });
     }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecOverAggregateBase.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecOverAggregateBase.java
index 860d40b34e1..437ab6a80d9 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecOverAggregateBase.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecOverAggregateBase.java
@@ -39,6 +39,10 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import static org.apache.calcite.sql.SqlKind.CUME_DIST;
+import static org.apache.calcite.sql.SqlKind.NTILE;
+import static org.apache.calcite.sql.SqlKind.PERCENT_RANK;
+
 /** Batch {@link ExecNode} base class for sort-based over window aggregate. */
 public abstract class BatchExecOverAggregateBase extends ExecNodeBase<RowData>
         implements InputSortedExecNode<RowData>, SingleTransformationTranslator<RowData> {
@@ -99,6 +103,15 @@ public abstract class BatchExecOverAggregateBase extends ExecNodeBase<RowData>
         return overSpec.getConstants();
     }
 
+    protected boolean containSizeBasedWindowFunction(GroupSpec group) {
+        return group.getAggCalls().stream()
+                .anyMatch(
+                        agg ->
+                                (agg.getAggregation().getKind() == CUME_DIST)
+                                        || (agg.getAggregation().getKind() == PERCENT_RANK)
+                                        || (agg.getAggregation().getKind() == NTILE));
+    }
+
     /** Infer the over window mode based on given group info. */
     protected OverWindowMode inferGroupMode(GroupSpec group) {
         AggregateCall aggCall = group.getAggCalls().get(0);
@@ -111,6 +124,10 @@ public abstract class BatchExecOverAggregateBase extends ExecNodeBase<RowData>
         } else {
             if (aggCall.getAggregation() instanceof SqlLeadLagAggFunction) {
                 return OverWindowMode.OFFSET;
+            } else if (aggCall.getAggregation().getKind() == CUME_DIST) {
+                // CUME_DIST is range mode (RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW),
+                // because equal values in window partition should return same result.
+                return OverWindowMode.RANGE;
             } else {
                 return OverWindowMode.INSENSITIVE;
             }
@@ -130,7 +147,7 @@ public abstract class BatchExecOverAggregateBase extends ExecNodeBase<RowData>
         OFFSET,
         /**
          * The INSENSITIVE mode does not care the window framing without LEAD/LAG agg function, for
-         * example RANK/DENSE_RANK/PERCENT_RANK/CUME_DIST/ROW_NUMBER.
+         * example RANK/DENSE_RANK/PERCENT_RANK/ROW_NUMBER.
          */
         INSENSITIVE
     }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggCodeGen.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggCodeGen.scala
index 228366978b3..cbfbd6ff514 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggCodeGen.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggCodeGen.scala
@@ -33,6 +33,8 @@ trait AggCodeGen {
 
   def resetAccumulator(generator: ExprCodeGenerator): String
 
+  def setWindowSize(generator: ExprCodeGenerator): String
+
   def accumulate(generator: ExprCodeGenerator): String
 
   def retract(generator: ExprCodeGenerator): String
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala
index f54015bfc83..79300c8e26b 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala
@@ -18,7 +18,7 @@
 package org.apache.flink.table.planner.codegen.agg
 
 import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.{DataTypes, TableException}
 import org.apache.flink.table.data.GenericRowData
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.functions.ImperativeAggregateFunction
@@ -86,6 +86,7 @@ class AggsHandlerCodeGenerator(
   private var isAccumulateNeeded = false
   private var isRetractNeeded = false
   private var isMergeNeeded = false
+  private var isWindowSizeNeeded = false
 
   var valueType: RowType = _
 
@@ -189,6 +190,11 @@ class AggsHandlerCodeGenerator(
     this
   }
 
+  def needWindowSize(): AggsHandlerCodeGenerator = {
+    this.isWindowSizeNeeded = true
+    this
+  }
+
   /** Adds window properties such as window_start, window_end */
   private def initialWindowProperties(
       windowProperties: Seq[WindowProperty],
@@ -320,6 +326,7 @@ class AggsHandlerCodeGenerator(
     initialAggregateInformation(aggInfoList)
 
     // generates all methods body first to add necessary reuse code to context
+    val setWindowSizeCode = genSetWindowSize()
     val createAccumulatorsCode = genCreateAccumulators()
     val getAccumulatorsCode = genGetAccumulators()
     val setAccumulatorsCode = genSetAccumulators()
@@ -353,6 +360,11 @@ class AggsHandlerCodeGenerator(
             ${ctx.reuseOpenCode()}
           }
 
+          @Override
+          public void setWindowSize(int $WINDOWS_SIZE) {
+            $setWindowSizeCode
+          }
+
           @Override
           public void accumulate($ROW_DATA $ACCUMULATE_INPUT_TERM) throws Exception {
             $accumulateCode
@@ -847,6 +859,35 @@ class AggsHandlerCodeGenerator(
       ctx.tableConfig)
   }
 
+  private def genSetWindowSize(): String = {
+    // The generated method 'setWindowSize' in OverWindowFrame#prepare will always be called
+    // no matter window size is needed or not. If window size is not needed,
+    // the method 'setWindowSize' will do nothing.
+    // So, please make sure to set the variable 'isWindowSizeNeeded' = true
+    // if window size is needed.
+    if (isWindowSizeNeeded) {
+      val methodName = "setWindowSize"
+      ctx.startNewLocalVariableStatement(methodName)
+
+      val exprGenerator = new ExprCodeGenerator(ctx, INPUT_NOT_NULL)
+        .bindInput(DataTypes.INT().getLogicalType, WINDOWS_SIZE)
+      val body = aggBufferCodeGens
+        // ignore distinct agg codegen
+        .filter(agg => !agg.isInstanceOf[DistinctAggCodeGen])
+        .map(_.setWindowSize(exprGenerator))
+        .mkString("\n")
+
+      s"""
+         |${ctx.reuseLocalVariableCode(methodName)}
+         |${ctx.reuseInputUnboxingCode(WINDOWS_SIZE)}
+         |${ctx.reusePerRecordCode()}
+         |$body
+         |""".stripMargin
+    } else {
+      ""
+    }
+  }
+
   private def genCreateAccumulators(): String = {
     val methodName = "createAccumulators"
     ctx.startNewLocalVariableStatement(methodName)
@@ -1208,6 +1249,7 @@ object AggsHandlerCodeGenerator {
   val MERGED_ACC_TERM = "otherAcc"
   val ACCUMULATE_INPUT_TERM = "accInput"
   val RETRACT_INPUT_TERM = "retractInput"
+  val WINDOWS_SIZE = "windowSize"
   val DISTINCT_KEY_TERM = "distinctKey"
 
   val NAMESPACE_TERM = "namespace"
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/DeclarativeAggCodeGen.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/DeclarativeAggCodeGen.scala
index 36eff937a53..b74dd8957b9 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/DeclarativeAggCodeGen.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/DeclarativeAggCodeGen.scala
@@ -25,7 +25,7 @@ import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.DISTI
 import org.apache.flink.table.planner.expressions.{DeclarativeExpressionResolver, RexNodeExpression}
 import org.apache.flink.table.planner.expressions.DeclarativeExpressionResolver.{toRexDistinctKey, toRexInputRef}
 import org.apache.flink.table.planner.expressions.converter.ExpressionConverter
-import org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction
+import org.apache.flink.table.planner.functions.aggfunctions.{DeclarativeAggregateFunction, SizeBasedWindowFunction}
 import org.apache.flink.table.planner.plan.utils.AggregateInfo
 import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.{fromDataTypeToLogicalType, fromLogicalTypeToDataType}
 import org.apache.flink.table.types.logical.LogicalType
@@ -77,6 +77,14 @@ class DeclarativeAggCodeGen(
 
   private val rexNodeGen = new ExpressionConverter(relBuilder)
 
+  private val windowSizeTerm = function match {
+    case f: SizeBasedWindowFunction =>
+      val exprCodegen = new ExprCodeGenerator(ctx, false)
+      exprCodegen.generateExpression(f.windowSizeAttribute().accept(rexNodeGen))
+      f.windowSizeAttribute().getName
+    case _ => null
+  }
+
   private val bufferNullTerms = {
     val exprCodegen = new ExprCodeGenerator(ctx, false)
     bufferTerms
@@ -284,4 +292,13 @@ class DeclarativeAggCodeGen(
       needEmitValue: Boolean = false): Unit = {
     // skip the check for DeclarativeAggregateFunction for now
   }
+
+  override def setWindowSize(generator: ExprCodeGenerator): String = {
+    function match {
+      case _: SizeBasedWindowFunction =>
+        s"""this.$windowSizeTerm = ${generator.input1Term};"""
+      case _ =>
+        ""
+    }
+  }
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/DistinctAggCodeGen.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/DistinctAggCodeGen.scala
index dd3c23fa979..74681b7fe18 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/DistinctAggCodeGen.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/DistinctAggCodeGen.scala
@@ -955,4 +955,9 @@ class DistinctAggCodeGen(
        """.stripMargin
     }
   }
+
+  override def setWindowSize(generator: ExprCodeGenerator): String = {
+    throw new TableException(
+      "Distinct shouldn't set window size, this is a bug, please file a issue.")
+  }
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/ImperativeAggCodeGen.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/ImperativeAggCodeGen.scala
index 097da7b57a9..533c956d3a3 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/ImperativeAggCodeGen.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/ImperativeAggCodeGen.scala
@@ -502,4 +502,10 @@ class ImperativeAggCodeGen(
     val accTerm = if (isAccTypeInternal) accInternalTerm else accExternalTerm
     s"$functionTerm.emitValue($accTerm, $MEMBER_COLLECTOR_TERM);"
   }
+
+  override def setWindowSize(generator: ExprCodeGenerator): String = {
+    // currently, we don't support set window size for ImperativeAggregateFunction,
+    // so return empty string directly
+    ""
+  }
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
index 74f3a2dcaaa..284131bff9d 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
@@ -98,6 +98,13 @@ class AggFunctionFactory(
       case a: SqlRankFunction if a.getKind == SqlKind.DENSE_RANK =>
         createDenseRankAggFunction(argTypes)
 
+      case a: SqlRankFunction if a.getKind == SqlKind.CUME_DIST =>
+        if (isBounded) {
+          createCumeDistAggFunction(argTypes)
+        } else {
+          throw new TableException("CUME_DIST Function is not supported in stream mode.")
+        }
+
       case func: SqlLeadLagAggFunction =>
         if (isBounded) {
           createBatchLeadLagAggFunction(argTypes, index)
@@ -502,6 +509,10 @@ class AggFunctionFactory(
     new RowNumberAggFunction
   }
 
+  private def createCumeDistAggFunction(argTypes: Array[LogicalType]): UserDefinedFunction = {
+    new CumeDistAggFunction
+  }
+
   private def createRankAggFunction(argTypes: Array[LogicalType]): UserDefinedFunction = {
     val argTypes = orderKeyIndexes.map(inputRowType.getChildren.get(_))
     new RankAggFunction(argTypes)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala
index eb4d32ac64c..d627e32a12f 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala
@@ -266,7 +266,6 @@ class OverAggregateITCase extends BatchTestBase {
 
   @Test
   def testWindowAggregationRank2(): Unit = {
-
     checkResult(
       "SELECT d, e, rank() over (order by e desc), dense_rank() over (order by e desc) FROM Table5",
       Seq(
@@ -2759,6 +2758,55 @@ class OverAggregateITCase extends BatchTestBase {
       "select dep,name,rank() over (partition by dep order by salary desc) as rnk from emp",
       Seq(row("1", "A", 2), row("1", "B", 1), row("2", "C", 1)))
   }
+
+  @Test
+  def testCumeDist(): Unit = {
+    checkResult(
+      "SELECT f, CUME_DIST() over (order by e desc)," +
+        " CUME_DIST() over (partition by d order by e) " +
+        " FROM Table5",
+      Seq(
+        row(0, 1.0, 1.0),
+        row(1, 0.9333333333333333, 0.5),
+        row(2, 0.8666666666666667, 1.0),
+        row(3, 0.8, 0.3333333333333333),
+        row(4, 0.7333333333333333, 0.6666666666666666),
+        row(5, 0.6666666666666666, 1.0),
+        row(6, 0.6, 0.25),
+        row(7, 0.5333333333333333, 0.5),
+        row(8, 0.4666666666666667, 0.75),
+        row(9, 0.4, 1.0),
+        row(10, 0.3333333333333333, 0.2),
+        row(11, 0.26666666666666666, 0.4),
+        row(12, 0.2, 0.6),
+        row(13, 0.13333333333333333, 0.8),
+        row(14, 0.06666666666666667, 1.0)
+      )
+    )
+
+    // test values of order-key containing duplicates
+    checkResult(
+      "SELECT f, CUME_DIST() over (order by d), CUME_DIST() over (order by d desc)" +
+        " FROM Table5",
+      Seq(
+        row(13, 1.0, 0.3333333333333333),
+        row(12, 1.0, 0.3333333333333333),
+        row(14, 1.0, 0.3333333333333333),
+        row(11, 1.0, 0.3333333333333333),
+        row(10, 1.0, 0.3333333333333333),
+        row(9, 0.6666666666666666, 0.6),
+        row(6, 0.6666666666666666, 0.6),
+        row(8, 0.6666666666666666, 0.6),
+        row(7, 0.6666666666666666, 0.6),
+        row(4, 0.4, 0.8),
+        row(5, 0.4, 0.8),
+        row(3, 0.4, 0.8),
+        row(1, 0.2, 0.9333333333333333),
+        row(2, 0.2, 0.9333333333333333),
+        row(0, 0.06666666666666667, 1.0)
+      )
+    )
+  }
 }
 
 /** The initial accumulator for count aggregate function */
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/AggsHandleFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/AggsHandleFunction.java
index 43339d8875a..816ab7b62e2 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/AggsHandleFunction.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/AggsHandleFunction.java
@@ -36,4 +36,16 @@ public interface AggsHandleFunction extends AggsHandleFunctionBase {
      * @return the final result (saved in a row) of the current accumulators.
      */
     RowData getValue() throws Exception;
+
+    /**
+     * Set window size for the aggregate function. It's used in batch scenario to set the total rows
+     * of the current window frame. More information for window frame:
+     * https://docs.oracle.com/cd/E17952_01/mysql-8.0-en/window-functions-frames.html
+     *
+     * <p>We may need to set the value for some window functions may require the total rows of
+     * current window frame to do calculation. For example, the function PERCENT_RANK need to know
+     * the window's size, and the SQL looks like: <code>
+     * SELECT PERCENT_RANK() OVER ([ partition_by_clause] order_by_clause)</code>.
+     */
+    void setWindowSize(int windowSize);
 }
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/frame/InsensitiveOverFrame.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/frame/InsensitiveOverFrame.java
index 3d1d423ed26..0f4383e9b78 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/frame/InsensitiveOverFrame.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/frame/InsensitiveOverFrame.java
@@ -49,6 +49,7 @@ public class InsensitiveOverFrame implements OverWindowFrame {
 
     @Override
     public void prepare(ResettableExternalBuffer rows) throws Exception {
+        processor.setWindowSize(rows.size());
         // reset the accumulator value
         processor.setAccumulators(processor.createAccumulators());
     }
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/frame/OffsetOverFrame.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/frame/OffsetOverFrame.java
index d59c11ecaa3..8f2e3bf8e71 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/frame/OffsetOverFrame.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/frame/OffsetOverFrame.java
@@ -74,6 +74,7 @@ public class OffsetOverFrame implements OverWindowFrame {
 
     @Override
     public void prepare(ResettableExternalBuffer rows) throws Exception {
+        processor.setWindowSize(rows.size());
         // reset the accumulator value
         processor.setAccumulators(processor.createAccumulators());
         currentBufferLength = rows.size();
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/frame/SlidingOverFrame.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/frame/SlidingOverFrame.java
index 2ffd514b799..693ec1031c4 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/frame/SlidingOverFrame.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/frame/SlidingOverFrame.java
@@ -78,6 +78,7 @@ public abstract class SlidingOverFrame implements OverWindowFrame {
         inputIterator = rows.newIterator();
         nextRow = OverWindowFrame.getNextOrNull(inputIterator);
         buffer.clear();
+        processor.setWindowSize(rows.size());
         // cleanup the retired accumulators value
         processor.setAccumulators(processor.createAccumulators());
     }
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/frame/UnboundedFollowingOverFrame.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/frame/UnboundedFollowingOverFrame.java
index 2e89031f090..59589008539 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/frame/UnboundedFollowingOverFrame.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/frame/UnboundedFollowingOverFrame.java
@@ -70,6 +70,7 @@ public abstract class UnboundedFollowingOverFrame implements OverWindowFrame {
     @Override
     public void prepare(ResettableExternalBuffer rows) throws Exception {
         input = rows;
+        processor.setWindowSize(rows.size());
         // cleanup the retired accumulators value
         processor.setAccumulators(processor.createAccumulators());
         inputIndex = 0;
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/frame/UnboundedOverWindowFrame.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/frame/UnboundedOverWindowFrame.java
index 7f50014cbde..b3665d182d8 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/frame/UnboundedOverWindowFrame.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/frame/UnboundedOverWindowFrame.java
@@ -59,6 +59,7 @@ public class UnboundedOverWindowFrame implements OverWindowFrame {
 
     @Override
     public void prepare(ResettableExternalBuffer rows) throws Exception {
+        processor.setWindowSize(rows.size());
         // cleanup the retired accumulators value
         processor.setAccumulators(processor.createAccumulators());
         ResettableExternalBuffer.BufferIterator iterator = rows.newIterator();
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/frame/UnboundedPrecedingOverFrame.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/frame/UnboundedPrecedingOverFrame.java
index bb4ba389540..ccea60651cf 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/frame/UnboundedPrecedingOverFrame.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/frame/UnboundedPrecedingOverFrame.java
@@ -64,6 +64,7 @@ public abstract class UnboundedPrecedingOverFrame implements OverWindowFrame {
         if (inputIterator.advanceNext()) {
             nextRow = inputIterator.getRow().copy();
         }
+        processor.setWindowSize(rows.size());
         // reset the accumulators value
         processor.setAccumulators(processor.createAccumulators());
     }
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/SumAggsHandleFunction.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/SumAggsHandleFunction.java
index f61f0b2054c..46ee4b424f2 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/SumAggsHandleFunction.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/SumAggsHandleFunction.java
@@ -76,6 +76,9 @@ public class SumAggsHandleFunction implements AggsHandleFunction {
         return getAccumulators();
     }
 
+    @Override
+    public void setWindowSize(int windowSize) {}
+
     @Override
     public void cleanup() throws Exception {}