You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2017/06/29 02:55:55 UTC

flink git commit: [FLINK-7014] [table] Expose isDeterministic interface to UserDefinedFunction

Repository: flink
Updated Branches:
  refs/heads/master 958d3762d -> b59148cf7


[FLINK-7014] [table] Expose isDeterministic interface to UserDefinedFunction

This closes #4200


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b59148cf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b59148cf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b59148cf

Branch: refs/heads/master
Commit: b59148cf7d206951191a24e6a6ce43937db5f045
Parents: 958d376
Author: Xpray <le...@gmail.com>
Authored: Wed Jun 28 20:16:56 2017 +0800
Committer: Jark Wu <ja...@apache.org>
Committed: Thu Jun 29 10:54:20 2017 +0800

----------------------------------------------------------------------
 .../table/functions/UserDefinedFunction.scala   |  8 ++++
 .../table/functions/utils/AggSqlFunction.scala  |  2 +
 .../functions/utils/ScalarSqlFunction.scala     |  1 +
 .../functions/utils/TableSqlFunction.scala      |  1 +
 .../flink/table/ExpressionReductionTest.scala   | 50 +++++++++++++++++++-
 5 files changed, 61 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b59148cf/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
index e9e01ee..7c57ea0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
@@ -40,6 +40,14 @@ abstract class UserDefinedFunction extends Serializable {
   @throws(classOf[Exception])
   def close(): Unit = {}
 
+  /**
+    * @return true iff a call to this function is guaranteed to always return
+    *         the same result given the same parameters; true is assumed by default
+    *         if user's function is not pure functional, like random(), date(), now()...
+    *         isDeterministic must return false
+    */
+  def isDeterministic: Boolean = true
+
   final def functionIdentifier: String = {
     val md5  =  DigestUtils.md5Hex(serialize(this))
     getClass.getCanonicalName.replace('.', '$').concat("$").concat(md5)

http://git-wip-us.apache.org/repos/asf/flink/blob/b59148cf/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
index 816bc52..be5501a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
@@ -57,6 +57,8 @@ class AggSqlFunction(
   ) {
 
   def getFunction: AggregateFunction[_, _] = aggregateFunction
+
+  override def isDeterministic: Boolean = aggregateFunction.isDeterministic
 }
 
 object AggSqlFunction {

http://git-wip-us.apache.org/repos/asf/flink/blob/b59148cf/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
index bbfa3aa..b1b45cd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
@@ -52,6 +52,7 @@ class ScalarSqlFunction(
 
   def getScalarFunction = scalarFunction
 
+  override def isDeterministic: Boolean = scalarFunction.isDeterministic
 }
 
 object ScalarSqlFunction {

http://git-wip-us.apache.org/repos/asf/flink/blob/b59148cf/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala
index 74f3374..b37d75b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala
@@ -69,6 +69,7 @@ class TableSqlFunction(
     */
   def getPojoFieldMapping = functionImpl.fieldIndexes
 
+  override def isDeterministic: Boolean = udtf.isDeterministic
 }
 
 object TableSqlFunction {

http://git-wip-us.apache.org/repos/asf/flink/blob/b59148cf/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
index 59eff4b..2e9ee46 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
@@ -20,9 +20,10 @@ package org.apache.flink.table
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.Types
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.functions.ScalarFunction
 import org.apache.flink.table.utils.TableTestBase
 import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
+import org.junit.{Ignore, Test}
 
 class ExpressionReductionTest extends TableTestBase {
 
@@ -458,4 +459,51 @@ class ExpressionReductionTest extends TableTestBase {
     util.verifySql(sqlQuery, expected)
   }
 
+  // todo this NPE is caused by Calcite, it shall pass when [CALCITE-1860] is fixed
+  @Ignore
+  def testReduceDeterministicUDF(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    // if isDeterministic = true, will cause a Calcite NPE, which will be fixed in [CALCITE-1860]
+    val result = table
+      .select('a, 'b, 'c, DeterministicNullFunc() as 'd)
+      .where("d.isNull")
+      .select('a, 'b, 'c)
+
+    val expected: String = streamTableNode(0)
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testReduceNonDeterministicUDF(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val result = table
+      .select('a, 'b, 'c, NonDeterministicNullFunc() as 'd)
+      .where("d.isNull")
+      .select('a, 'b, 'c)
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      streamTableNode(0),
+      term("select", "a", "b", "c"),
+      term("where", s"IS NULL(${NonDeterministicNullFunc.functionIdentifier}())")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+}
+
+object NonDeterministicNullFunc extends ScalarFunction {
+  def eval(): String = null
+  override def isDeterministic = false
+}
+
+object DeterministicNullFunc extends ScalarFunction {
+  def eval(): String = null
+  override def isDeterministic = true
 }