You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/10/25 08:39:16 UTC

[flink] branch release-1.9 updated: [FLINK-14408][table-planner] Fix open() is not called if UDF is reduced during optimization

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new e1825ee  [FLINK-14408][table-planner] Fix open() is not called if UDF is reduced during optimization
e1825ee is described below

commit e1825eebd8927ef92c73b95c95bce56fdc5f4a89
Author: whlwanghailong <wh...@didichuxing.com>
AuthorDate: Wed Oct 16 23:04:56 2019 +0800

    [FLINK-14408][table-planner] Fix open() is not called if UDF is reduced during optimization
---
 .../table/planner/codegen/ExpressionReducer.scala  |  6 +-
 .../apache/flink/table/codegen/CodeGenerator.scala |  9 ++-
 .../flink/table/codegen/ExpressionReducer.scala    | 75 ++++++++++++++++++++--
 .../table/plan/ExpressionReductionRulesTest.scala  | 36 +++++++++++
 4 files changed, 113 insertions(+), 13 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
index bb5ff90..941b681 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
@@ -102,11 +102,7 @@ class ExpressionReducer(
       case _ => throw new TableException("RichMapFunction[GenericRow, GenericRow] required here")
     }
 
-    val parameters = if (config.getConfiguration != null) {
-      config.getConfiguration
-    } else {
-      new Configuration()
-    }
+    val parameters = config.getConfiguration
     val reduced = try {
       richMapFunction.open(parameters)
       // execute
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index 2a7bb67..c6f7d76 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -1617,7 +1617,10 @@ abstract class CodeGenerator(
     * @param contextTerm [[RuntimeContext]] term to access the [[RuntimeContext]]
     * @return member variable term
     */
-  def addReusableFunction(function: UserDefinedFunction, contextTerm: String = null): String = {
+  def addReusableFunction(
+      function: UserDefinedFunction,
+      contextTerm: String = null,
+      functionContextClass: Class[_ <: FunctionContext] = classOf[FunctionContext]): String = {
     val classQualifier = function.getClass.getCanonicalName
     val functionSerializedData = EncodingUtils.encodeObjectToString(function)
     val fieldTerm = s"function_${function.functionIdentifier}"
@@ -1640,11 +1643,11 @@ abstract class CodeGenerator(
 
     val openFunction = if (contextTerm != null) {
       s"""
-         |$fieldTerm.open(new ${classOf[FunctionContext].getCanonicalName}($contextTerm));
+         |$fieldTerm.open(new ${functionContextClass.getCanonicalName}($contextTerm));
        """.stripMargin
     } else {
       s"""
-         |$fieldTerm.open(new ${classOf[FunctionContext].getCanonicalName}(getRuntimeContext()));
+         |$fieldTerm.open(new ${functionContextClass.getCanonicalName}(getRuntimeContext()));
        """.stripMargin
     }
     reusableOpenStatements.add(openFunction)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
index dfed70b..a612eb9 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
@@ -18,22 +18,27 @@
 
 package org.apache.flink.table.codegen
 
+import java.io.File
 import java.util
 
 import org.apache.calcite.plan.RelOptPlanner
 import org.apache.calcite.rex.{RexBuilder, RexNode}
 import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.flink.api.common.functions.util.FunctionUtils
 import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.metrics.MetricGroup
 import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.functions.{FunctionContext, UserDefinedFunction}
 import org.apache.flink.types.Row
 
 import scala.collection.JavaConverters._
 
 /**
-  * Evaluates constant expressions using Flink's [[FunctionCodeGenerator]].
+  * Evaluates constant expressions using Flink's [[ConstantFunctionCodeGenerator]].
   */
 class ExpressionReducer(config: TableConfig)
   extends RelOptPlanner.Executor with Compiler[MapFunction[Row, Row]] {
@@ -83,8 +88,9 @@ class ExpressionReducer(config: TableConfig)
     val literalTypes = literals.map(e => FlinkTypeFactory.toTypeInfo(e.getType))
     val resultType = new RowTypeInfo(literalTypes: _*)
 
+    val parameters = config.getConfiguration
     // generate MapFunction
-    val generator = new FunctionCodeGenerator(config, false, EMPTY_ROW_INFO)
+    val generator = new ConstantFunctionCodeGenerator(config, false, EMPTY_ROW_INFO)
 
     val result = generator.generateResultExpression(
       resultType,
@@ -106,8 +112,12 @@ class ExpressionReducer(config: TableConfig)
       generatedFunction.code)
     val function = clazz.newInstance()
 
-    // execute
-    val reduced = function.map(EMPTY_ROW)
+    val reduced = try {
+      FunctionUtils.openFunction(function, parameters)
+      function.map(EMPTY_ROW)
+    } finally {
+      FunctionUtils.closeFunction(function)
+    }
 
     // add the reduced results or keep them unreduced
     var i = 0
@@ -147,3 +157,58 @@ class ExpressionReducer(config: TableConfig)
     }
   }
 }
+
+/**
+  * A [[ConstantFunctionContext]] allows to obtain user-defined configuration information set
+  * in [[TableConfig]].
+  *
+  * @param parameters User-defined configuration set in [[TableConfig]].
+  */
+class ConstantFunctionContext(parameters: Configuration) extends FunctionContext(null) {
+
+  override def getMetricGroup: MetricGroup = {
+    throw new UnsupportedOperationException("getMetricGroup is not supported when optimizing")
+  }
+
+  override def getCachedFile(name: String): File = {
+    throw new UnsupportedOperationException("getCachedFile is not supported when optimizing")
+  }
+
+  /**
+    * Gets the user-defined configuration value associated with the given key as a string.
+    *
+    * @param key          key pointing to the associated value
+    * @param defaultValue default value which is returned in case user-defined configuration
+    *                     value is null or there is no value associated with the given key
+    * @return (default) value associated with the given key
+    */
+  override def getJobParameter(key: String, defaultValue: String): String = {
+    parameters.getString(key, defaultValue)
+  }
+}
+
+/**
+  * A [[ConstantFunctionCodeGenerator]] used for constant expression code generator
+  * @param config configuration that determines runtime behavior
+  * @param nullableInput input(s) can be null.
+  * @param input1 type information about the first input of the Function
+  */
+class ConstantFunctionCodeGenerator(
+    config: TableConfig,
+    nullableInput: Boolean,
+    input1: TypeInformation[_ <: Any])
+  extends FunctionCodeGenerator(config, nullableInput, input1) {
+  /**
+    * Adds a reusable [[UserDefinedFunction]] to the member area of the generated [[Function]].
+    *
+    * @param function    [[UserDefinedFunction]] object to be instantiated during runtime
+    * @param contextTerm term to access the Context
+    * @return member variable term
+    */
+  override def addReusableFunction(
+      function: UserDefinedFunction,
+      contextTerm: String = null,
+      functionContextClass: Class[_ <: FunctionContext] = classOf[FunctionContext]): String = {
+    super.addReusableFunction(function, "parameters", classOf[ConstantFunctionContext])
+  }
+}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala
index f3cc8ca..3f36086 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table.plan
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.Types
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.utils.{Func1, RichFunc1}
 import org.apache.flink.table.functions.ScalarFunction
 import org.apache.flink.table.utils.TableTestBase
 import org.apache.flink.table.utils.TableTestUtil._
@@ -499,6 +500,41 @@ class ExpressionReductionRulesTest extends TableTestBase {
 
     util.verifyTable(result, expected)
   }
+
+  @Test
+  def testExpressionReductionWithUDF(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    util.addFunction("MyUdf", Func1)
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      streamTableNode(table),
+      term("select", "CAST(2) AS constantValue")
+    )
+
+    util.verifySql("SELECT MyUdf(1) as constantValue FROM MyTable", expected)
+
+  }
+
+  @Test
+  def testExpressionReductionWithRichUDF(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    util.addFunction("MyUdf", new RichFunc1)
+    util.tableEnv.getConfig.getConfiguration.setString("int.value", "10")
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      streamTableNode(table),
+      term("select", "CAST(11) AS constantValue")
+    )
+
+    util.verifySql("SELECT MyUdf(1) as constantValue FROM MyTable", expected)
+  }
+
 }
 
 object NonDeterministicNullFunc extends ScalarFunction {