You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/10/25 08:39:16 UTC
[flink] branch release-1.9 updated: [FLINK-14408][table-planner]
Fix open() is not called if UDF is reduced during optimization
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new e1825ee [FLINK-14408][table-planner] Fix open() is not called if UDF is reduced during optimization
e1825ee is described below
commit e1825eebd8927ef92c73b95c95bce56fdc5f4a89
Author: whlwanghailong <wh...@didichuxing.com>
AuthorDate: Wed Oct 16 23:04:56 2019 +0800
[FLINK-14408][table-planner] Fix open() is not called if UDF is reduced during optimization
---
.../table/planner/codegen/ExpressionReducer.scala | 6 +-
.../apache/flink/table/codegen/CodeGenerator.scala | 9 ++-
.../flink/table/codegen/ExpressionReducer.scala | 75 ++++++++++++++++++++--
.../table/plan/ExpressionReductionRulesTest.scala | 36 +++++++++++
4 files changed, 113 insertions(+), 13 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
index bb5ff90..941b681 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
@@ -102,11 +102,7 @@ class ExpressionReducer(
case _ => throw new TableException("RichMapFunction[GenericRow, GenericRow] required here")
}
- val parameters = if (config.getConfiguration != null) {
- config.getConfiguration
- } else {
- new Configuration()
- }
+ val parameters = config.getConfiguration
val reduced = try {
richMapFunction.open(parameters)
// execute
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index 2a7bb67..c6f7d76 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -1617,7 +1617,10 @@ abstract class CodeGenerator(
* @param contextTerm [[RuntimeContext]] term to access the [[RuntimeContext]]
* @return member variable term
*/
- def addReusableFunction(function: UserDefinedFunction, contextTerm: String = null): String = {
+ def addReusableFunction(
+ function: UserDefinedFunction,
+ contextTerm: String = null,
+ functionContextClass: Class[_ <: FunctionContext] = classOf[FunctionContext]): String = {
val classQualifier = function.getClass.getCanonicalName
val functionSerializedData = EncodingUtils.encodeObjectToString(function)
val fieldTerm = s"function_${function.functionIdentifier}"
@@ -1640,11 +1643,11 @@ abstract class CodeGenerator(
val openFunction = if (contextTerm != null) {
s"""
- |$fieldTerm.open(new ${classOf[FunctionContext].getCanonicalName}($contextTerm));
+ |$fieldTerm.open(new ${functionContextClass.getCanonicalName}($contextTerm));
""".stripMargin
} else {
s"""
- |$fieldTerm.open(new ${classOf[FunctionContext].getCanonicalName}(getRuntimeContext()));
+ |$fieldTerm.open(new ${functionContextClass.getCanonicalName}(getRuntimeContext()));
""".stripMargin
}
reusableOpenStatements.add(openFunction)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
index dfed70b..a612eb9 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
@@ -18,22 +18,27 @@
package org.apache.flink.table.codegen
+import java.io.File
import java.util
import org.apache.calcite.plan.RelOptPlanner
import org.apache.calcite.rex.{RexBuilder, RexNode}
import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.flink.api.common.functions.util.FunctionUtils
import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.metrics.MetricGroup
import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.functions.{FunctionContext, UserDefinedFunction}
import org.apache.flink.types.Row
import scala.collection.JavaConverters._
/**
- * Evaluates constant expressions using Flink's [[FunctionCodeGenerator]].
+ * Evaluates constant expressions using Flink's [[ConstantFunctionCodeGenerator]].
*/
class ExpressionReducer(config: TableConfig)
extends RelOptPlanner.Executor with Compiler[MapFunction[Row, Row]] {
@@ -83,8 +88,9 @@ class ExpressionReducer(config: TableConfig)
val literalTypes = literals.map(e => FlinkTypeFactory.toTypeInfo(e.getType))
val resultType = new RowTypeInfo(literalTypes: _*)
+ val parameters = config.getConfiguration
// generate MapFunction
- val generator = new FunctionCodeGenerator(config, false, EMPTY_ROW_INFO)
+ val generator = new ConstantFunctionCodeGenerator(config, false, EMPTY_ROW_INFO)
val result = generator.generateResultExpression(
resultType,
@@ -106,8 +112,12 @@ class ExpressionReducer(config: TableConfig)
generatedFunction.code)
val function = clazz.newInstance()
- // execute
- val reduced = function.map(EMPTY_ROW)
+ val reduced = try {
+ FunctionUtils.openFunction(function, parameters)
+ function.map(EMPTY_ROW)
+ } finally {
+ FunctionUtils.closeFunction(function)
+ }
// add the reduced results or keep them unreduced
var i = 0
@@ -147,3 +157,58 @@ class ExpressionReducer(config: TableConfig)
}
}
}
+
+/**
+ * A [[ConstantFunctionContext]] allows to obtain user-defined configuration information set
+ * in [[TableConfig]].
+ *
+ * @param parameters User-defined configuration set in [[TableConfig]].
+ */
+class ConstantFunctionContext(parameters: Configuration) extends FunctionContext(null) {
+
+ override def getMetricGroup: MetricGroup = {
+ throw new UnsupportedOperationException("getMetricGroup is not supported when optimizing")
+ }
+
+ override def getCachedFile(name: String): File = {
+ throw new UnsupportedOperationException("getCachedFile is not supported when optimizing")
+ }
+
+ /**
+ * Gets the user-defined configuration value associated with the given key as a string.
+ *
+ * @param key key pointing to the associated value
+ * @param defaultValue default value which is returned in case user-defined configuration
+ * value is null or there is no value associated with the given key
+ * @return (default) value associated with the given key
+ */
+ override def getJobParameter(key: String, defaultValue: String): String = {
+ parameters.getString(key, defaultValue)
+ }
+}
+
+/**
+ * A [[ConstantFunctionCodeGenerator]] used for constant expression code generator
+ * @param config configuration that determines runtime behavior
+ * @param nullableInput input(s) can be null.
+ * @param input1 type information about the first input of the Function
+ */
+class ConstantFunctionCodeGenerator(
+ config: TableConfig,
+ nullableInput: Boolean,
+ input1: TypeInformation[_ <: Any])
+ extends FunctionCodeGenerator(config, nullableInput, input1) {
+ /**
+ * Adds a reusable [[UserDefinedFunction]] to the member area of the generated [[Function]].
+ *
+ * @param function [[UserDefinedFunction]] object to be instantiated during runtime
+ * @param contextTerm term to access the Context
+ * @return member variable term
+ */
+ override def addReusableFunction(
+ function: UserDefinedFunction,
+ contextTerm: String = null,
+ functionContextClass: Class[_ <: FunctionContext] = classOf[FunctionContext]): String = {
+ super.addReusableFunction(function, "parameters", classOf[ConstantFunctionContext])
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala
index f3cc8ca..3f36086 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table.plan
import org.apache.flink.api.scala._
import org.apache.flink.table.api.Types
import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.utils.{Func1, RichFunc1}
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.table.utils.TableTestBase
import org.apache.flink.table.utils.TableTestUtil._
@@ -499,6 +500,41 @@ class ExpressionReductionRulesTest extends TableTestBase {
util.verifyTable(result, expected)
}
+
+ @Test
+ def testExpressionReductionWithUDF(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+ util.addFunction("MyUdf", Func1)
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ streamTableNode(table),
+ term("select", "CAST(2) AS constantValue")
+ )
+
+ util.verifySql("SELECT MyUdf(1) as constantValue FROM MyTable", expected)
+
+ }
+
+ @Test
+ def testExpressionReductionWithRichUDF(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+ util.addFunction("MyUdf", new RichFunc1)
+ util.tableEnv.getConfig.getConfiguration.setString("int.value", "10")
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ streamTableNode(table),
+ term("select", "CAST(11) AS constantValue")
+ )
+
+ util.verifySql("SELECT MyUdf(1) as constantValue FROM MyTable", expected)
+ }
+
}
object NonDeterministicNullFunc extends ScalarFunction {