You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/09/15 10:30:05 UTC

[flink] 02/03: [FLINK-28569][table-planner] Move non-deterministic test functions to userDefinedScalarFunctions

This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7df115c016cab326627e4c12b2a4c449c1794e95
Author: lincoln.lil <li...@gmail.com>
AuthorDate: Thu Sep 8 18:11:15 2022 +0800

    [FLINK-28569][table-planner] Move non-deterministic test functions to userDefinedScalarFunctions
    
    This closes #20791
---
 .../utils/userDefinedScalarFunctions.scala         | 53 +++++++++++++++++-
 .../plan/stream/sql/NonDeterministicDagTest.scala  | 64 ++--------------------
 2 files changed, 58 insertions(+), 59 deletions(-)

diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/userDefinedScalarFunctions.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/userDefinedScalarFunctions.scala
index d45a71ba40c..e1468bead66 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/userDefinedScalarFunctions.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/userDefinedScalarFunctions.scala
@@ -20,7 +20,8 @@ package org.apache.flink.table.planner.expressions.utils
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.annotation.DataTypeHint
 import org.apache.flink.table.api.Types
-import org.apache.flink.table.functions.{FunctionContext, ScalarFunction}
+import org.apache.flink.table.functions.{AggregateFunction, FunctionContext, ScalarFunction, TableFunction}
+import org.apache.flink.table.planner.utils.CountAccumulator
 import org.apache.flink.types.Row
 
 import org.apache.commons.lang3.StringUtils
@@ -232,3 +233,53 @@ class SplitUDF(deterministic: Boolean) extends ScalarFunction {
   override def isDeterministic: Boolean = deterministic
 
 }
+
+@SerialVersionUID(1L)
+class TestNonDeterministicUdf extends ScalarFunction {
+  val random = new Random()
+
+  def eval(id: JLong): JLong = {
+    id + random.nextInt()
+  }
+
+  def eval(id: Int): Int = {
+    id + random.nextInt()
+  }
+
+  def eval(id: String): String = {
+    s"$id-${random.nextInt()}"
+  }
+
+  override def isDeterministic: Boolean = false
+}
+
+@SerialVersionUID(1L)
+class TestNonDeterministicUdtf extends TableFunction[String] {
+
+  val random = new Random()
+
+  def eval(id: Int): Unit = {
+    collect(s"${id + random.nextInt()}")
+  }
+
+  def eval(id: String): Unit = {
+    id.split(",").foreach(str => collect(s"$str#${random.nextInt()}"))
+  }
+
+  override def isDeterministic: Boolean = false
+}
+
+class TestNonDeterministicUdaf extends AggregateFunction[JLong, CountAccumulator] {
+
+  val random = new Random()
+
+  def accumulate(acc: CountAccumulator, in: JLong): Unit = {
+    acc.f0 += (in + random.nextInt())
+  }
+
+  override def getValue(acc: CountAccumulator): JLong = acc.f0
+
+  override def createAccumulator(): CountAccumulator = new CountAccumulator
+
+  override def isDeterministic: Boolean = false
+}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.scala
index c86bb1c07e4..dbd44247dd0 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.scala
@@ -26,10 +26,10 @@ import org.apache.flink.table.api.config.{ExecutionConfigOptions, OptimizerConfi
 import org.apache.flink.table.api.config.OptimizerConfigOptions.NonDeterministicUpdateStrategy
 import org.apache.flink.table.api.internal.TableEnvironmentInternal
 import org.apache.flink.table.data.RowData
-import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction}
-import org.apache.flink.table.planner.{JBoolean, JLong}
+import org.apache.flink.table.planner.JBoolean
+import org.apache.flink.table.planner.expressions.utils.{TestNonDeterministicUdaf, TestNonDeterministicUdf, TestNonDeterministicUdtf}
 import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.StringSplit
-import org.apache.flink.table.planner.utils.{CountAccumulator, StreamTableTestUtil, TableTestBase}
+import org.apache.flink.table.planner.utils.{StreamTableTestUtil, TableTestBase}
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
 import org.apache.flink.table.sinks.UpsertStreamTableSink
 import org.apache.flink.table.types.DataType
@@ -42,8 +42,6 @@ import org.junit.runners.Parameterized
 
 import java.util
 
-import scala.util.Random
-
 @RunWith(classOf[Parameterized])
 class NonDeterministicDagTest(nonDeterministicUpdateStrategy: NonDeterministicUpdateStrategy)
   extends TableTestBase {
@@ -212,9 +210,9 @@ class NonDeterministicDagTest(nonDeterministicUpdateStrategy: NonDeterministicUp
                                |)""".stripMargin)
 
     // custom ND function
-    util.tableEnv.createTemporaryFunction("ndFunc", TestNonDeterministicUdf)
-    util.tableEnv.createTemporaryFunction("ndTableFunc", TestNonDeterministicUdtf)
-    util.tableEnv.createTemporaryFunction("ndAggFunc", TestTestNonDeterministicUdaf)
+    util.tableEnv.createTemporaryFunction("ndFunc", new TestNonDeterministicUdf)
+    util.tableEnv.createTemporaryFunction("ndTableFunc", new TestNonDeterministicUdtf)
+    util.tableEnv.createTemporaryFunction("ndAggFunc", new TestNonDeterministicUdaf)
     // deterministic table function
     util.tableEnv.createTemporaryFunction("str_split", new StringSplit())
   }
@@ -1561,56 +1559,6 @@ class NonDeterministicDagTest(nonDeterministicUpdateStrategy: NonDeterministicUp
     util.verifyExecPlan(stmtSet)
   }
 
-  @SerialVersionUID(1L)
-  object TestNonDeterministicUdf extends ScalarFunction {
-    val random = new Random()
-
-    def eval(id: JLong): JLong = {
-      id + random.nextInt()
-    }
-
-    def eval(id: Int): Int = {
-      id + random.nextInt()
-    }
-
-    def eval(id: String): String = {
-      s"$id-${random.nextInt()}"
-    }
-
-    override def isDeterministic: Boolean = false
-  }
-
-  @SerialVersionUID(1L)
-  object TestNonDeterministicUdtf extends TableFunction[String] {
-
-    val random = new Random()
-
-    def eval(id: Int): Unit = {
-      collect(s"${id + random.nextInt()}")
-    }
-
-    def eval(id: String): Unit = {
-      id.split(",").foreach(str => collect(s"$str#${random.nextInt()}"))
-    }
-
-    override def isDeterministic: Boolean = false
-  }
-
-  object TestTestNonDeterministicUdaf extends AggregateFunction[JLong, CountAccumulator] {
-
-    val random = new Random()
-
-    def accumulate(acc: CountAccumulator, in: JLong): Unit = {
-      acc.f0 += (in + random.nextInt())
-    }
-
-    override def getValue(acc: CountAccumulator): JLong = acc.f0
-
-    override def createAccumulator(): CountAccumulator = new CountAccumulator
-
-    override def isDeterministic: Boolean = false
-  }
-
   /**
    * This upsert test sink does support getting primary key from table schema. We defined a similar
    * test sink here not using existing {@link TestingUpsertTableSink} in {@link StreamTestSink}