You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/09/04 13:36:12 UTC

[flink] branch master updated: [FLINK-29091][table-planner] Fix the determinism declaration of the rand function to be consistent with the current behavior

This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b15bc900eb [FLINK-29091][table-planner] Fix the determinism declaration of the rand function to be consistent with the current behavior
4b15bc900eb is described below

commit 4b15bc900eb60b1830bc406975ce974ad6050f98
Author: lincoln.lil <li...@gmail.com>
AuthorDate: Wed Aug 24 18:00:09 2022 +0800

    [FLINK-29091][table-planner] Fix the determinism declaration of the rand function to be consistent with the current behavior
    
    This closes #20674
---
 docs/content.zh/docs/dev/table/functions/udfs.md   | 53 ++++++++++++++++++++++
 docs/content/docs/dev/table/functions/udfs.md      | 48 +++++++++++++++++++-
 .../functions/sql/FlinkSqlOperatorTable.java       | 44 +++++++++++++++++-
 .../rules/logical/ExpressionReductionRulesTest.xml | 17 +++++++
 .../expressions/NonDeterministicTests.scala        |  9 ++++
 .../logical/ExpressionReductionRulesTest.scala     |  6 +++
 6 files changed, 173 insertions(+), 4 deletions(-)

diff --git a/docs/content.zh/docs/dev/table/functions/udfs.md b/docs/content.zh/docs/dev/table/functions/udfs.md
index 6a3da037d07..ab619d06868 100644
--- a/docs/content.zh/docs/dev/table/functions/udfs.md
+++ b/docs/content.zh/docs/dev/table/functions/udfs.md
@@ -585,6 +585,59 @@ public static class LiteralFunction extends ScalarFunction {
 For more examples of custom type inference, see also the `flink-examples-table` module with
 {{< gh_link file="flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/functions/AdvancedFunctionsExample.java" name="advanced function implementation" >}}.
 
+### 确定性
+
+每个用户自定义函数类都可以通过重写 `isDeterministic()` 方法来声明它是否产生确定性的结果。如果该函数不是纯粹函数式的(如`random()`, `date()`, 或`now()`),该方法必须返回 `false`。默认情况下,`isDeterministic()` 返回 `true`。
+
+此外,重写 `isDeterministic()` 方法也可能影响运行时行为。运行时实现可能会在两个不同的阶段被调用:
+
+1. **在生成执行计划期间**:如果一个函数是通过常量表达式调用的或者常量表达式可以从给定的语句中推导出来,那么一个函数就会被预计算以减少常量表达式,并且可能不再在集群上执行。
+除非 `isDeterministic()` 被重写为 `false` 用来在这种情况下禁用常量表达式简化。比如说,以下对 `ABS` 的调用在生成执行计划期间被执行:`SELECT ABS(-1) FROM t` 和 `SELECT ABS(field) FROM t WHERE field = -1`,而 `SELECT ABS(field) FROM t` 则不执行。
+
+2. **在运行时(即在集群执行)**:如果一个函数被调用时带有非常量表达式或 `isDeterministic()` 返回 `false`。
+
+#### 内置函数的确定性
+系统(内置)函数的确定性是不可改变的。存在两种不具有确定性的函数:动态函数和非确定性函数,根据 Apache Calcite `SqlOperator` 的定义:
+```java
+  /**
+   * Returns whether a call to this operator is guaranteed to always return
+   * the same result given the same operands; true is assumed by default.
+   */
+  public boolean isDeterministic() {
+    return true;
+  }
+
+  /**
+   * Returns whether it is unsafe to cache query plans referencing this
+   * operator; false is assumed by default.
+   */
+  public boolean isDynamicFunction() {
+    return false;
+  }
+```
+
+`isDeterministic` 表示函数的确定性,声明返回 `false` 时将在运行时对每个记录进行计算。
+`isDynamicFunction` 声明返回 `true` 时意味着该函数只能在查询开始时被计算,对于批处理模式,它只在生成执行计划期间被执行,
+而对于流模式,它等效于一个非确定性的函数,这是因为查询在逻辑上是连续执行的(流模式对[动态表的连续查询抽象]({{< ref "docs/dev/table/concepts/dynamic_tables" >}}#dynamic-tables-amp-continuous-queries)),所以动态函数在每次查询执行时也会被重新计算(当前实现下等效于每条记录计算)。
+
+以下内置函数总是非确定性的(批和流模式下,都在运行时对每条记录进行计算)
+- UUID
+- RAND
+- RAND_INTEGER
+- CURRENT_DATABASE
+- UNIX_TIMESTAMP
+- CURRENT_ROW_TIMESTAMP
+
+以下内置时间函数是动态的,批处理模式下,将在生成执行计划期间被执行(查询开始),对于流模式,将在运行时对每条记录进行计算
+- CURRENT_DATE
+- CURRENT_TIME
+- CURRENT_TIMESTAMP
+- NOW
+- LOCALTIME
+- LOCALTIMESTAMP
+
+注意:`isDynamicFunction` 仅适用于内置函数
+
 ### 运行时集成
 -------------------
 
diff --git a/docs/content/docs/dev/table/functions/udfs.md b/docs/content/docs/dev/table/functions/udfs.md
index 59a38739af3..683a691d768 100644
--- a/docs/content/docs/dev/table/functions/udfs.md
+++ b/docs/content/docs/dev/table/functions/udfs.md
@@ -605,16 +605,60 @@ the method must return `false`. By default, `isDeterministic()` returns `true`.
 Furthermore, the `isDeterministic()` method might also influence the runtime behavior. A runtime
 implementation might be called at two different stages:
 
-**During planning (i.e. pre-flight phase)**: If a function is called with constant expressions
+**1. During planning (i.e. pre-flight phase)**: If a function is called with constant expressions
 or constant expressions can be derived from the given statement, a function is pre-evaluated
 for constant expression reduction and might not be executed on the cluster anymore. Unless
 `isDeterministic()` is used to disable constant expression reduction in this case. For example,
 the following calls to `ABS` are executed during planning: `SELECT ABS(-1) FROM t` and
 `SELECT ABS(field) FROM t WHERE field = -1`; whereas `SELECT ABS(field) FROM t` is not.
 
-**During runtime (i.e. cluster execution)**: If a function is called with non-constant expressions
+**2. During runtime (i.e. cluster execution)**: If a function is called with non-constant expressions
 or `isDeterministic()` returns `false`.
 
+#### System (Built-in) Function Determinism
+The determinism of system (built-in) functions are immutable. There exists two kinds of functions which are not deterministic:
+dynamic function and non-deterministic function, according to Apache Calcite's `SqlOperator` definition:
+```java
+  /**
+   * Returns whether a call to this operator is guaranteed to always return
+   * the same result given the same operands; true is assumed by default.
+   */
+  public boolean isDeterministic() {
+    return true;
+  }
+
+  /**
+   * Returns whether it is unsafe to cache query plans referencing this
+   * operator; false is assumed by default.
+   */
+  public boolean isDynamicFunction() {
+    return false;
+  }
+```
+`isDeterministic` indicates the determinism of a function, will be evaluated per record during runtime if returns `false`.
+`isDynamicFunction` implies the function can only be evaluated at query-start if returns `true`,
+it will be only pre-evaluated during planning for batch mode, while for streaming mode, it is equivalent to a non-deterministic
+function because of the query is continuously being executed logically(the abstraction of [continuous query over the dynamic tables]({{< ref "docs/dev/table/concepts/dynamic_tables" >}}#dynamic-tables-amp-continuous-queries)),
+so the dynamic functions are also re-evaluated for each query execution(equivalent to per record in current implementation).
+
+The following system functions are always non-deterministic(evaluated per record during runtime both in batch and streaming mode):
+- UUID
+- RAND
+- RAND_INTEGER
+- CURRENT_DATABASE
+- UNIX_TIMESTAMP
+- CURRENT_ROW_TIMESTAMP
+ 
+The following system temporal functions are dynamic, which will be pre-evaluated during planning(query-start) for batch mode and evaluated per record for streaming mode:
+- CURRENT_DATE
+- CURRENT_TIME
+- CURRENT_TIMESTAMP
+- NOW
+- LOCALTIME
+- LOCALTIMESTAMP
+
+Note: `isDynamicFunction` is only applicable for system functions.
+
 ### Runtime Integration
 
 Sometimes it might be necessary for a user-defined function to get global runtime information or do some setup/clean-up work before the actual work. User-defined functions provide `open()` and `close()` methods that can be overridden and provide similar functionality as the methods in `RichFunction` of DataStream API.
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index cfdf34e2e79..223783ed7fa 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -39,6 +39,7 @@ import org.apache.calcite.sql.type.OperandTypes;
 import org.apache.calcite.sql.type.ReturnTypes;
 import org.apache.calcite.sql.type.SqlOperandCountRanges;
 import org.apache.calcite.sql.type.SqlReturnTypeInference;
+import org.apache.calcite.sql.type.SqlSingleOperandTypeChecker;
 import org.apache.calcite.sql.type.SqlTypeFamily;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.sql.type.SqlTypeTransforms;
@@ -577,6 +578,11 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
             new FlinkSqlTimestampFunction(
                     "CURRENT_ROW_TIMESTAMP", SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, 3) {
 
+                @Override
+                public boolean isDeterministic() {
+                    return false;
+                }
+
                 @Override
                 public SqlSyntax getSyntax() {
                     return SqlSyntax.FUNCTION;
@@ -876,6 +882,42 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 
     public static final SqlFunction TRY_CAST = new SqlTryCastFunction();
 
+    public static final SqlFunction RAND =
+            new SqlFunction(
+                    "RAND",
+                    SqlKind.OTHER_FUNCTION,
+                    ReturnTypes.DOUBLE,
+                    null,
+                    OperandTypes.or(
+                            new SqlSingleOperandTypeChecker[] {
+                                OperandTypes.NILADIC, OperandTypes.NUMERIC
+                            }),
+                    SqlFunctionCategory.NUMERIC) {
+
+                @Override
+                public boolean isDeterministic() {
+                    return false;
+                }
+            };
+
+    public static final SqlFunction RAND_INTEGER =
+            new SqlFunction(
+                    "RAND_INTEGER",
+                    SqlKind.OTHER_FUNCTION,
+                    ReturnTypes.INTEGER,
+                    null,
+                    OperandTypes.or(
+                            new SqlSingleOperandTypeChecker[] {
+                                OperandTypes.NUMERIC, OperandTypes.NUMERIC_NUMERIC
+                            }),
+                    SqlFunctionCategory.NUMERIC) {
+
+                @Override
+                public boolean isDeterministic() {
+                    return false;
+                }
+            };
+
     /** <code>AUXILIARY_GROUP</code> aggregate function. Only be used in internally. */
     public static final SqlAggFunction AUXILIARY_GROUP = new SqlAuxiliaryGroupAggFunction();
 
@@ -1108,8 +1150,6 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
     public static final SqlFunction RADIANS = SqlStdOperatorTable.RADIANS;
     public static final SqlFunction SIGN = SqlStdOperatorTable.SIGN;
     public static final SqlFunction PI = SqlStdOperatorTable.PI;
-    public static final SqlFunction RAND = SqlStdOperatorTable.RAND;
-    public static final SqlFunction RAND_INTEGER = SqlStdOperatorTable.RAND_INTEGER;
 
     // TIME FUNCTIONS
     public static final SqlFunction YEAR = SqlStdOperatorTable.YEAR;
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ExpressionReductionRulesTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ExpressionReductionRulesTest.xml
index ad52a39a921..6133fee2a66 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ExpressionReductionRulesTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ExpressionReductionRulesTest.xml
@@ -31,6 +31,23 @@ LogicalProject(EXPR$0=[PyUdf()], EXPR$1=[MyUdf(1)])
 Calc(select=[f0 AS EXPR$0, CAST(2 AS INTEGER) AS EXPR$1])
 +- PythonCalc(select=[PyUdf() AS f0])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testExpressionReductionWithRand">
+    <Resource name="sql">
+      <![CDATA[SELECT RAND(), RAND(), RAND(1), RAND(1), RAND_INTEGER(3), RAND_INTEGER(3) FROM MyTable]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(EXPR$0=[RAND()], EXPR$1=[RAND()], EXPR$2=[RAND(1)], EXPR$3=[RAND(1)], EXPR$4=[RAND_INTEGER(3)], EXPR$5=[RAND_INTEGER(3)])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[RAND() AS EXPR$0, RAND() AS EXPR$1, RAND(1) AS EXPR$2, RAND(1) AS EXPR$3, RAND_INTEGER(3) AS EXPR$4, RAND_INTEGER(3) AS EXPR$5])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala
index 10864d3f292..a9175dfe963 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala
@@ -178,6 +178,15 @@ class NonDeterministicTests extends ExpressionTestBase {
     testAllApis(uuid().charLength(), "CHARACTER_LENGTH(UUID())", "36")
   }
 
+  @Test
+  def testRand(): Unit = {
+    testSqlApi("RAND() <> RAND() or RAND() = RAND()", "TRUE")
+    testSqlApi("RAND(1) <> RAND(1) or RAND(1) = RAND(1)", "TRUE")
+    testSqlApi(
+      "RAND_INTEGER(10) <> RAND_INTEGER(10) or RAND_INTEGER(10) = RAND_INTEGER(10)",
+      "TRUE")
+  }
+
   // ----------------------------------------------------------------------------------------------
 
   override def testData: Row = new Row(0)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/ExpressionReductionRulesTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/ExpressionReductionRulesTest.scala
index cdd0e3f4975..166fb790213 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/ExpressionReductionRulesTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/ExpressionReductionRulesTest.scala
@@ -60,6 +60,12 @@ class ExpressionReductionRulesTest extends TableTestBase {
     util.addFunction("MyUdf", Func1)
     util.verifyExecPlan("SELECT PyUdf(), MyUdf(1) FROM MyTable")
   }
+
+  @Test
+  def testExpressionReductionWithRand(): Unit = {
+    util.verifyExecPlan(
+      "SELECT RAND(), RAND(), RAND(1), RAND(1), RAND_INTEGER(3), RAND_INTEGER(3) FROM MyTable")
+  }
 }
 
 @SerialVersionUID(1L)