You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/10/02 10:14:00 UTC

[jira] [Commented] (FLINK-10451) TableFunctionCollector should handle the life cycle of ScalarFunction

    [ https://issues.apache.org/jira/browse/FLINK-10451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635248#comment-16635248 ] 

ASF GitHub Bot commented on FLINK-10451:
----------------------------------------

asfgit closed pull request #6771: [FLINK-10451] [table] TableFunctionCollector should handle the life cycle of ScalarFunction
URL: https://github.com/apache/flink/pull/6771
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala
index 9fc76e32983..85d858fb75b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala
@@ -18,6 +18,7 @@
 package org.apache.flink.table.codegen
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.codegen.CodeGenUtils.{boxedTypeTermForTypeInfo, newName}
 import org.apache.flink.table.codegen.Indenter.toISC
@@ -63,7 +64,8 @@ class CollectorCodeGenerator(
   def generateTableFunctionCollector(
       name: String,
       bodyCode: String,
-      collectedType: TypeInformation[Any])
+      collectedType: TypeInformation[Any],
+      codeGenerator: CodeGenerator)
     : GeneratedCollector = {
 
     val className = newName(name)
@@ -95,6 +97,11 @@ class CollectorCodeGenerator(
       |  }
       |
       |  @Override
+      |  public void open(${classOf[Configuration].getCanonicalName} parameters) throws Exception {
+      |    ${codeGenerator.reuseOpenCode()}
+      |  }
+      |
+      |  @Override
       |  public void collect(Object record) throws Exception {
       |    super.collect(record);
       |    $input1TypeClass $input1Term = ($input1TypeClass) getInput();
@@ -105,7 +112,8 @@ class CollectorCodeGenerator(
       |  }
       |
       |  @Override
-      |  public void close() {
+      |  public void close() throws Exception {
+      |    ${codeGenerator.reuseCloseCode()}
       |  }
       |}
       |""".stripMargin
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
index 43314577ab8..3475e1901e9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
@@ -136,6 +136,13 @@ trait CommonCorrelate {
       returnSchema.typeInfo,
       returnSchema.fieldNames)
 
+    val filterGenerator = new FunctionCodeGenerator(
+      config,
+      false,
+      udtfTypeInfo,
+      None,
+      pojoFieldMapping)
+
     val collectorCode = if (condition.isEmpty) {
       s"""
          |${crossResultExpr.code}
@@ -153,13 +160,6 @@ trait CommonCorrelate {
       //   The generated expression is discarded.
       generator.generateExpression(condition.get.accept(changeInputRefIndexShuttle))
 
-      val filterGenerator = new FunctionCodeGenerator(
-        config,
-        false,
-        udtfTypeInfo,
-        None,
-        pojoFieldMapping)
-
       filterGenerator.input1Term = filterGenerator.input2Term
       val filterCondition = filterGenerator.generateExpression(condition.get)
       s"""
@@ -175,7 +175,8 @@ trait CommonCorrelate {
     generator.generateTableFunctionCollector(
       "TableFunctionCollector",
       collectorCode,
-      udtfTypeInfo)
+      udtfTypeInfo,
+      filterGenerator)
   }
 
   private[flink] def selectToString(rowType: RelDataType): String = {
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala
index 2553d9cd67b..747828cedbd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala
@@ -59,7 +59,9 @@ class CRowCorrelateProcessRunner(
     val constructor = processClazz.getConstructor(classOf[TableFunctionCollector[_]])
     LOG.debug("Instantiating ProcessFunction.")
     function = constructor.newInstance(collector).asInstanceOf[ProcessFunction[Row, Row]]
+    FunctionUtils.setFunctionRuntimeContext(collector, getRuntimeContext)
     FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
+    FunctionUtils.openFunction(collector, parameters)
     FunctionUtils.openFunction(function, parameters)
   }
 
@@ -85,6 +87,7 @@ class CRowCorrelateProcessRunner(
   override def getProducedType: TypeInformation[CRow] = returnType
 
   override def close(): Unit = {
+    FunctionUtils.closeFunction(collector)
     FunctionUtils.closeFunction(function)
   }
 }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CorrelateFlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CorrelateFlatMapRunner.scala
index e2f5e611336..811169bcdc5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CorrelateFlatMapRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CorrelateFlatMapRunner.scala
@@ -52,7 +52,9 @@ class CorrelateFlatMapRunner[IN, OUT](
     val constructor = flatMapClazz.getConstructor(classOf[TableFunctionCollector[_]])
     LOG.debug("Instantiating FlatMapFunction.")
     function = constructor.newInstance(collector).asInstanceOf[FlatMapFunction[IN, OUT]]
+    FunctionUtils.setFunctionRuntimeContext(collector, getRuntimeContext)
     FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
+    FunctionUtils.openFunction(collector, parameters)
     FunctionUtils.openFunction(function, parameters)
   }
 
@@ -66,6 +68,7 @@ class CorrelateFlatMapRunner[IN, OUT](
   override def getProducedType: TypeInformation[OUT] = returnType
 
   override def close(): Unit = {
+    FunctionUtils.closeFunction(collector)
     FunctionUtils.closeFunction(function)
   }
 }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/TableFunctionCollector.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/TableFunctionCollector.scala
index c9cca47d165..7db424489a5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/TableFunctionCollector.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/TableFunctionCollector.scala
@@ -17,12 +17,13 @@
  */
 package org.apache.flink.table.runtime
 
+import org.apache.flink.api.common.functions.AbstractRichFunction
 import org.apache.flink.util.Collector
 
 /**
   * The basic implementation of collector for [[org.apache.flink.table.functions.TableFunction]].
   */
-abstract class TableFunctionCollector[T] extends Collector[T] {
+abstract class TableFunctionCollector[T] extends AbstractRichFunction with Collector[T] {
 
   private var input: Any = _
   private var collector: Collector[_] = _
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
index 32e5d71662e..6d02afca510 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
@@ -290,6 +290,23 @@ object Func19 extends ScalarFunction {
 
 }
 
+class Func20 extends ScalarFunction {
+
+  private var permitted: Boolean = false
+
+  override def open(context: FunctionContext): Unit = {
+    permitted = true
+  }
+
+  def eval(x: Int): Boolean = {
+    permitted
+  }
+
+  override def close(): Unit = {
+    permitted = false
+  }
+}
+
 class SplitUDF(deterministic: Boolean) extends ScalarFunction {
   def eval(x: String, sep: String, index: Int): String = {
     val splits = StringUtils.splitByWholeSeparator(x, sep)
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
index b385015102c..4a0a08222e2 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.{TableEnvironment, Types, ValidationException}
-import org.apache.flink.table.expressions.utils.{Func1, Func18, RichFunc2}
+import org.apache.flink.table.expressions.utils.{Func1, Func18, Func20, RichFunc2}
 import org.apache.flink.table.runtime.utils.JavaUserDefinedTableFunctions.JavaTableFunc0
 import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.table.runtime.utils.{TableProgramsClusterTestBase, _}
@@ -367,6 +367,33 @@ class CorrelateITCase(
     assertTrue(results0.isEmpty)
   }
 
+  @Test
+  def testTableFunctionWithFilterInside(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
+    val t = testData(env).toTable(tableEnv).as('a, 'b, 'c)
+    val func0 = new TableFunc0
+    val func20 = new Func20
+
+    val result = t
+      .join(func0('c) as('d, 'e))
+      .where(func20('e))
+      .select('c, 'd, 'e)
+
+    val results = result.toDataSet[Row].collect()
+
+    val expected = Seq (
+      "Jack#22,Jack,22",
+      "John#19,John,19",
+      "Anna#44,Anna,44"
+    )
+
+    TestBaseUtils.compareResultAsText(
+      results.asJava,
+      expected.sorted.mkString("\n")
+    )
+  }
+
   private def testData(
       env: ExecutionEnvironment)
     : DataSet[(Int, Long, String)] = {
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala
index 0f563e61f64..d71742d0def 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala
@@ -23,7 +23,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.{TableEnvironment, Types, ValidationException}
-import org.apache.flink.table.expressions.utils.{Func18, RichFunc2}
+import org.apache.flink.table.expressions.utils.{Func18, Func20, RichFunc2}
 import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, _}
 import org.apache.flink.table.utils._
 import org.apache.flink.test.util.AbstractTestBase
@@ -257,6 +257,33 @@ class CorrelateITCase extends AbstractTestBase {
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  @Test
+  def testTableFunctionWithFilterInside(): Unit = {
+    val t = testData(env).toTable(tEnv).as('a, 'b, 'c)
+    val func0 = new TableFunc0
+    val func20 = new Func20
+
+    val result = t
+      .join(func0('c) as('d, 'e))
+      .where(func20('e))
+      .select('c, 'd, 'e)
+      .toAppendStream[Row]
+
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = Seq (
+      "Jack#22,Jack,22",
+      "John#19,John,19",
+      "Anna#44,Anna,44"
+    )
+
+    assertEquals(
+      expected.sorted,
+      StreamITCase.testResults.sorted
+    )
+  }
+
   private def testData(
       env: StreamExecutionEnvironment)
     : DataStream[(Int, Long, String)] = {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> TableFunctionCollector should handle the life cycle of ScalarFunction
> ---------------------------------------------------------------------
>
>                 Key: FLINK-10451
>                 URL: https://issues.apache.org/jira/browse/FLINK-10451
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API &amp; SQL
>            Reporter: Ruidong Li
>            Assignee: Ruidong Li
>            Priority: Major
>              Labels: pull-request-available
>
> Considering the following query:
> table.join(udtf('a)).where(udf('b))
> the filter will be pushed into DataSetCorrelate/DataStreamCorrelate without triggering open() and close()



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)