You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/03/13 09:58:30 UTC

flink git commit: [FLINK-5882] [table] TableFunction (UDTF) should support variable types and variable arguments

Repository: flink
Updated Branches:
  refs/heads/master 9b179beae -> 04aee61d8


[FLINK-5882] [table] TableFunction (UDTF) should support variable types and variable arguments

This closes #3407.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/04aee61d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/04aee61d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/04aee61d

Branch: refs/heads/master
Commit: 04aee61d86f9ba30715c133380560739282feb81
Parents: 9b179be
Author: Zhuoluo Yang <zh...@alibaba-inc.com>
Authored: Tue Mar 7 12:02:46 2017 +0800
Committer: twalthr <tw...@apache.org>
Committed: Mon Mar 13 10:55:10 2017 +0100

----------------------------------------------------------------------
 .../codegen/calls/TableFunctionCallGen.scala    | 17 ++++++---
 .../DataSetUserDefinedFunctionITCase.scala      | 37 ++++++++++++++++++++
 .../DataStreamUserDefinedFunctionITCase.scala   | 34 ++++++++++++++++--
 .../table/utils/UserDefinedTableFunctions.scala | 11 +++++-
 4 files changed, 91 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/04aee61d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
index 890b6bd..ba90292 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
@@ -44,14 +44,21 @@ class TableFunctionCallGen(
       codeGenerator: CodeGenerator,
       operands: Seq[GeneratedExpression])
     : GeneratedExpression = {
-    // determine function signature
-    val matchingSignature = getSignature(tableFunction, signature)
+    // determine function method
+    val matchingMethod = getEvalMethod(tableFunction, signature)
       .getOrElse(throw new CodeGenException("No matching signature found."))
+    val matchingSignature = matchingMethod.getParameterTypes
+
+    // zip for variable signatures
+    var paramToOperands = matchingSignature.zip(operands)
+    if (operands.length > matchingSignature.length) {
+      operands.drop(matchingSignature.length).foreach(op =>
+        paramToOperands = paramToOperands :+ (matchingSignature.last.getComponentType, op)
+      )
+    }
 
     // convert parameters for function (output boxing)
-    val parameters = matchingSignature
-        .zip(operands)
-        .map { case (paramClass, operandExpr) =>
+    val parameters = paramToOperands.map { case (paramClass, operandExpr) =>
           if (paramClass.isPrimitive) {
             operandExpr
           } else if (ClassUtils.isPrimitiveWrapper(paramClass)

http://git-wip-us.apache.org/repos/asf/flink/blob/04aee61d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala
index 33b2439..20bbf8b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala
@@ -31,6 +31,7 @@ import org.apache.flink.table.utils._
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.types.Row
+import org.junit.Assert._
 import org.junit.Test
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
@@ -277,6 +278,42 @@ class DataSetUserDefinedFunctionITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  @Test
+  def testTableFunctionWithVariableArguments(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
+    val varArgsFunc0 = new VarArgsFunc0
+    tableEnv.registerFunction("VarArgsFunc0", varArgsFunc0)
+
+    val result = testData(env)
+      .toTable(tableEnv, 'a, 'b, 'c)
+      .select('c)
+      .join(varArgsFunc0("1", "2", 'c))
+
+    val expected = "Anna#44,1\n" +
+      "Anna#44,2\n" +
+      "Anna#44,Anna#44\n" +
+      "Jack#22,1\n" +
+      "Jack#22,2\n" +
+      "Jack#22,Jack#22\n" +
+      "John#19,1\n" +
+      "John#19,2\n" +
+      "John#19,John#19\n" +
+      "nosharp,1\n" +
+      "nosharp,2\n" +
+      "nosharp,nosharp"
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+
+    // Test for empty cases
+    val result0 = testData(env)
+      .toTable(tableEnv, 'a, 'b, 'c)
+      .select('c)
+      .join(varArgsFunc0())
+    val results0 = result0.toDataSet[Row].collect()
+    assertTrue(results0.isEmpty)
+  }
+
   private def testData(
       env: ExecutionEnvironment)
     : DataSet[(Int, Long, String)] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/04aee61d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
index e7ce457..853c771 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala
@@ -24,8 +24,7 @@ import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData}
 import org.apache.flink.table.expressions.utils.{Func13, RichFunc2}
-import org.apache.flink.table.utils.{RichTableFunc1, TableFunc0, TableFunc3, UserDefinedFunctionTestUtils}
-import org.apache.flink.table.utils.PojoTableFunc
+import org.apache.flink.table.utils._
 import org.apache.flink.types.Row
 import org.junit.Assert._
 import org.junit.Test
@@ -196,6 +195,37 @@ class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestB
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  @Test
+  def testTableFunctionWithVariableArguments(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val varArgsFunc0 = new VarArgsFunc0
+    tableEnv.registerFunction("VarArgsFunc0", varArgsFunc0)
+
+    val result = testData(env)
+      .toTable(tableEnv, 'a, 'b, 'c)
+      .select('c)
+      .join(varArgsFunc0("1", "2", 'c))
+
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "Anna#44,1",
+      "Anna#44,2",
+      "Anna#44,Anna#44",
+      "Jack#22,1",
+      "Jack#22,2",
+      "Jack#22,Jack#22",
+      "John#19,1",
+      "John#19,2",
+      "John#19,John#19",
+      "nosharp,1",
+      "nosharp,2",
+      "nosharp,nosharp")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
   private def testData(
       env: StreamExecutionEnvironment)
     : DataStream[(Int, Long, String)] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/04aee61d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala
index 88917a2..d0ffade 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala
@@ -23,10 +23,12 @@ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.tuple.Tuple3
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.functions.{TableFunction, FunctionContext}
+import org.apache.flink.table.functions.{FunctionContext, TableFunction}
 import org.apache.flink.types.Row
 import org.junit.Assert
 
+import scala.annotation.varargs
+
 
 case class SimpleUser(name: String, age: Int)
 
@@ -203,3 +205,10 @@ class RichTableFunc1 extends TableFunction[String] {
     separator = None
   }
 }
+
+class VarArgsFunc0 extends TableFunction[String] {
+  @varargs
+  def eval(str: String*): Unit = {
+    str.foreach(collect)
+  }
+}