You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2017/06/29 02:55:55 UTC
flink git commit: [FLINK-7014] [table] Expose isDeterministic
interface to UserDefinedFunction
Repository: flink
Updated Branches:
refs/heads/master 958d3762d -> b59148cf7
[FLINK-7014] [table] Expose isDeterministic interface to UserDefinedFunction
This closes #4200
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b59148cf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b59148cf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b59148cf
Branch: refs/heads/master
Commit: b59148cf7d206951191a24e6a6ce43937db5f045
Parents: 958d376
Author: Xpray <le...@gmail.com>
Authored: Wed Jun 28 20:16:56 2017 +0800
Committer: Jark Wu <ja...@apache.org>
Committed: Thu Jun 29 10:54:20 2017 +0800
----------------------------------------------------------------------
.../table/functions/UserDefinedFunction.scala | 8 ++++
.../table/functions/utils/AggSqlFunction.scala | 2 +
.../functions/utils/ScalarSqlFunction.scala | 1 +
.../functions/utils/TableSqlFunction.scala | 1 +
.../flink/table/ExpressionReductionTest.scala | 50 +++++++++++++++++++-
5 files changed, 61 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b59148cf/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
index e9e01ee..7c57ea0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
@@ -40,6 +40,14 @@ abstract class UserDefinedFunction extends Serializable {
@throws(classOf[Exception])
def close(): Unit = {}
+ /**
+ * @return true iff a call to this function is guaranteed to always return
+ * the same result given the same parameters; true is assumed by default
+ * if user's function is not pure functional, like random(), date(), now()...
+ * isDeterministic must return false
+ */
+ def isDeterministic: Boolean = true
+
final def functionIdentifier: String = {
val md5 = DigestUtils.md5Hex(serialize(this))
getClass.getCanonicalName.replace('.', '$').concat("$").concat(md5)
http://git-wip-us.apache.org/repos/asf/flink/blob/b59148cf/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
index 816bc52..be5501a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
@@ -57,6 +57,8 @@ class AggSqlFunction(
) {
def getFunction: AggregateFunction[_, _] = aggregateFunction
+
+ override def isDeterministic: Boolean = aggregateFunction.isDeterministic
}
object AggSqlFunction {
http://git-wip-us.apache.org/repos/asf/flink/blob/b59148cf/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
index bbfa3aa..b1b45cd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
@@ -52,6 +52,7 @@ class ScalarSqlFunction(
def getScalarFunction = scalarFunction
+ override def isDeterministic: Boolean = scalarFunction.isDeterministic
}
object ScalarSqlFunction {
http://git-wip-us.apache.org/repos/asf/flink/blob/b59148cf/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala
index 74f3374..b37d75b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala
@@ -69,6 +69,7 @@ class TableSqlFunction(
*/
def getPojoFieldMapping = functionImpl.fieldIndexes
+ override def isDeterministic: Boolean = udtf.isDeterministic
}
object TableSqlFunction {
http://git-wip-us.apache.org/repos/asf/flink/blob/b59148cf/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
index 59eff4b..2e9ee46 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
@@ -20,9 +20,10 @@ package org.apache.flink.table
import org.apache.flink.api.scala._
import org.apache.flink.table.api.Types
import org.apache.flink.table.api.scala._
+import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.table.utils.TableTestBase
import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
+import org.junit.{Ignore, Test}
class ExpressionReductionTest extends TableTestBase {
@@ -458,4 +459,51 @@ class ExpressionReductionTest extends TableTestBase {
util.verifySql(sqlQuery, expected)
}
+ // todo this NPE is caused by Calcite, it shall pass when [CALCITE-1860] is fixed
+ @Ignore
+ def testReduceDeterministicUDF(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+ // if isDeterministic = true, will cause a Calcite NPE, which will be fixed in [CALCITE-1860]
+ val result = table
+ .select('a, 'b, 'c, DeterministicNullFunc() as 'd)
+ .where("d.isNull")
+ .select('a, 'b, 'c)
+
+ val expected: String = streamTableNode(0)
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testReduceNonDeterministicUDF(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+ val result = table
+ .select('a, 'b, 'c, NonDeterministicNullFunc() as 'd)
+ .where("d.isNull")
+ .select('a, 'b, 'c)
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "b", "c"),
+ term("where", s"IS NULL(${NonDeterministicNullFunc.functionIdentifier}())")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+}
+
+object NonDeterministicNullFunc extends ScalarFunction {
+ def eval(): String = null
+ override def isDeterministic = false
+}
+
+object DeterministicNullFunc extends ScalarFunction {
+ def eval(): String = null
+ override def isDeterministic = true
}