You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/09/15 10:30:05 UTC
[flink] 02/03: [FLINK-28569][table-planner] Move non-deterministic test functions to userDefinedScalarFunctions
This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7df115c016cab326627e4c12b2a4c449c1794e95
Author: lincoln.lil <li...@gmail.com>
AuthorDate: Thu Sep 8 18:11:15 2022 +0800
[FLINK-28569][table-planner] Move non-deterministic test functions to userDefinedScalarFunctions
This closes #20791
---
.../utils/userDefinedScalarFunctions.scala | 53 +++++++++++++++++-
.../plan/stream/sql/NonDeterministicDagTest.scala | 64 ++--------------------
2 files changed, 58 insertions(+), 59 deletions(-)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/userDefinedScalarFunctions.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/userDefinedScalarFunctions.scala
index d45a71ba40c..e1468bead66 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/userDefinedScalarFunctions.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/userDefinedScalarFunctions.scala
@@ -20,7 +20,8 @@ package org.apache.flink.table.planner.expressions.utils
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.annotation.DataTypeHint
import org.apache.flink.table.api.Types
-import org.apache.flink.table.functions.{FunctionContext, ScalarFunction}
+import org.apache.flink.table.functions.{AggregateFunction, FunctionContext, ScalarFunction, TableFunction}
+import org.apache.flink.table.planner.utils.CountAccumulator
import org.apache.flink.types.Row
import org.apache.commons.lang3.StringUtils
@@ -232,3 +233,53 @@ class SplitUDF(deterministic: Boolean) extends ScalarFunction {
override def isDeterministic: Boolean = deterministic
}
+
+@SerialVersionUID(1L)
+class TestNonDeterministicUdf extends ScalarFunction {
+ val random = new Random()
+
+ def eval(id: JLong): JLong = {
+ id + random.nextInt()
+ }
+
+ def eval(id: Int): Int = {
+ id + random.nextInt()
+ }
+
+ def eval(id: String): String = {
+ s"$id-${random.nextInt()}"
+ }
+
+ override def isDeterministic: Boolean = false
+}
+
+@SerialVersionUID(1L)
+class TestNonDeterministicUdtf extends TableFunction[String] {
+
+ val random = new Random()
+
+ def eval(id: Int): Unit = {
+ collect(s"${id + random.nextInt()}")
+ }
+
+ def eval(id: String): Unit = {
+ id.split(",").foreach(str => collect(s"$str#${random.nextInt()}"))
+ }
+
+ override def isDeterministic: Boolean = false
+}
+
+class TestNonDeterministicUdaf extends AggregateFunction[JLong, CountAccumulator] {
+
+ val random = new Random()
+
+ def accumulate(acc: CountAccumulator, in: JLong): Unit = {
+ acc.f0 += (in + random.nextInt())
+ }
+
+ override def getValue(acc: CountAccumulator): JLong = acc.f0
+
+ override def createAccumulator(): CountAccumulator = new CountAccumulator
+
+ override def isDeterministic: Boolean = false
+}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.scala
index c86bb1c07e4..dbd44247dd0 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.scala
@@ -26,10 +26,10 @@ import org.apache.flink.table.api.config.{ExecutionConfigOptions, OptimizerConfi
import org.apache.flink.table.api.config.OptimizerConfigOptions.NonDeterministicUpdateStrategy
import org.apache.flink.table.api.internal.TableEnvironmentInternal
import org.apache.flink.table.data.RowData
-import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction}
-import org.apache.flink.table.planner.{JBoolean, JLong}
+import org.apache.flink.table.planner.JBoolean
+import org.apache.flink.table.planner.expressions.utils.{TestNonDeterministicUdaf, TestNonDeterministicUdf, TestNonDeterministicUdtf}
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.StringSplit
-import org.apache.flink.table.planner.utils.{CountAccumulator, StreamTableTestUtil, TableTestBase}
+import org.apache.flink.table.planner.utils.{StreamTableTestUtil, TableTestBase}
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
import org.apache.flink.table.sinks.UpsertStreamTableSink
import org.apache.flink.table.types.DataType
@@ -42,8 +42,6 @@ import org.junit.runners.Parameterized
import java.util
-import scala.util.Random
-
@RunWith(classOf[Parameterized])
class NonDeterministicDagTest(nonDeterministicUpdateStrategy: NonDeterministicUpdateStrategy)
extends TableTestBase {
@@ -212,9 +210,9 @@ class NonDeterministicDagTest(nonDeterministicUpdateStrategy: NonDeterministicUp
|)""".stripMargin)
// custom ND function
- util.tableEnv.createTemporaryFunction("ndFunc", TestNonDeterministicUdf)
- util.tableEnv.createTemporaryFunction("ndTableFunc", TestNonDeterministicUdtf)
- util.tableEnv.createTemporaryFunction("ndAggFunc", TestTestNonDeterministicUdaf)
+ util.tableEnv.createTemporaryFunction("ndFunc", new TestNonDeterministicUdf)
+ util.tableEnv.createTemporaryFunction("ndTableFunc", new TestNonDeterministicUdtf)
+ util.tableEnv.createTemporaryFunction("ndAggFunc", new TestNonDeterministicUdaf)
// deterministic table function
util.tableEnv.createTemporaryFunction("str_split", new StringSplit())
}
@@ -1561,56 +1559,6 @@ class NonDeterministicDagTest(nonDeterministicUpdateStrategy: NonDeterministicUp
util.verifyExecPlan(stmtSet)
}
- @SerialVersionUID(1L)
- object TestNonDeterministicUdf extends ScalarFunction {
- val random = new Random()
-
- def eval(id: JLong): JLong = {
- id + random.nextInt()
- }
-
- def eval(id: Int): Int = {
- id + random.nextInt()
- }
-
- def eval(id: String): String = {
- s"$id-${random.nextInt()}"
- }
-
- override def isDeterministic: Boolean = false
- }
-
- @SerialVersionUID(1L)
- object TestNonDeterministicUdtf extends TableFunction[String] {
-
- val random = new Random()
-
- def eval(id: Int): Unit = {
- collect(s"${id + random.nextInt()}")
- }
-
- def eval(id: String): Unit = {
- id.split(",").foreach(str => collect(s"$str#${random.nextInt()}"))
- }
-
- override def isDeterministic: Boolean = false
- }
-
- object TestTestNonDeterministicUdaf extends AggregateFunction[JLong, CountAccumulator] {
-
- val random = new Random()
-
- def accumulate(acc: CountAccumulator, in: JLong): Unit = {
- acc.f0 += (in + random.nextInt())
- }
-
- override def getValue(acc: CountAccumulator): JLong = acc.f0
-
- override def createAccumulator(): CountAccumulator = new CountAccumulator
-
- override def isDeterministic: Boolean = false
- }
-
/**
* This upsert test sink does support getting primary key from table schema. We defined a similar
* test sink here not using existing {@link TestingUpsertTableSink} in {@link StreamTestSink}