You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/11/15 16:33:56 UTC

flink git commit: [FLINK-7678] [table] Support composite inputs for user-defined functions

Repository: flink
Updated Branches:
  refs/heads/master 11218a35d -> 54eeccfe1


[FLINK-7678] [table] Support composite inputs for user-defined functions

This closes #4726.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/54eeccfe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/54eeccfe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/54eeccfe

Branch: refs/heads/master
Commit: 54eeccfe18ace636b77ff0226ce1f91d35e4d49d
Parents: 11218a3
Author: twalthr <tw...@apache.org>
Authored: Tue Sep 26 12:10:33 2017 +0200
Committer: twalthr <tw...@apache.org>
Committed: Wed Nov 15 17:32:26 2017 +0100

----------------------------------------------------------------------
 .../table/api/stream/sql/CorrelateTest.scala    | 35 ++++++++++++++++++++
 .../UserDefinedScalarFunctionTest.scala         | 20 +++++++++--
 .../utils/userDefinedScalarFunctions.scala      | 16 ++++++++-
 .../runtime/batch/table/CorrelateITCase.scala   |  9 +++--
 .../runtime/stream/table/CorrelateITCase.scala  | 33 +++++++++++++++---
 .../table/utils/UserDefinedTableFunctions.scala | 27 +++++++++++----
 6 files changed, 121 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/54eeccfe/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala
index f243158..efb83b4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala
@@ -19,10 +19,12 @@
 package org.apache.flink.table.api.stream.sql
 
 import org.apache.flink.api.scala._
+import org.apache.flink.table.api.Types
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.runtime.utils.JavaUserDefinedTableFunctions.JavaVarsArgTableFunc0
 import org.apache.flink.table.utils.TableTestUtil._
 import org.apache.flink.table.utils.{HierarchyTableFunction, PojoTableFunc, TableFunc2, _}
+import org.apache.flink.types.Row
 import org.junit.Test
 
 class CorrelateTest extends TableTestBase {
@@ -227,6 +229,39 @@ class CorrelateTest extends TableTestBase {
   }
 
   @Test
+  def testRowType(): Unit = {
+    val util = streamTestUtil()
+    val rowType = Types.ROW(Types.INT, Types.BOOLEAN, Types.ROW(Types.INT, Types.INT, Types.INT))
+    util.addTable[Row]("MyTable", 'a, 'b, 'c)(rowType)
+    val function = new TableFunc5
+    util.addFunction("tableFunc5", function)
+
+    val sqlQuery = "SELECT c, tf.f2 FROM MyTable, LATERAL TABLE(tableFunc5(c)) AS tf"
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation", "tableFunc5($cor0.c)"),
+        term("correlate", "table(tableFunc5($cor0.c))"),
+        term("select", "a", "b", "c", "f0", "f1", "f2"),
+        term("rowType", "RecordType(" +
+          "INTEGER a, " +
+          "BOOLEAN b, " +
+          "COMPOSITE(Row(f0: Integer, f1: Integer, f2: Integer)) c, " +
+          "INTEGER f0, " +
+          "INTEGER f1, " +
+          "INTEGER f2)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "f2")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
   def testFilter(): Unit = {
     val util = streamTestUtil()
     val func2 = new TableFunc2

http://git-wip-us.apache.org/repos/asf/flink/blob/54eeccfe/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
index a01f2ae..dbfe5f6 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
@@ -30,6 +30,7 @@ import org.apache.flink.table.api.scala._
 import org.apache.flink.table.expressions.utils.{ExpressionTestBase, _}
 import org.apache.flink.table.functions.ScalarFunction
 import org.junit.Test
+import java.lang.{Boolean => JBoolean}
 
 class UserDefinedScalarFunctionTest extends ExpressionTestBase {
 
@@ -107,6 +108,14 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
       "Nullable(f0)",
       "Nullable(f0)",
       "42")
+
+    // test row type input
+    testAllApis(
+      Func19('f14),
+      "Func19(f14)",
+      "Func19(f14)",
+      "12,true,1,2,3"
+    )
   }
 
   @Test
@@ -368,7 +377,7 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
   // ----------------------------------------------------------------------------------------------
 
   override def testData: Any = {
-    val testData = new Row(14)
+    val testData = new Row(15)
     testData.setField(0, 42)
     testData.setField(1, "Test")
     testData.setField(2, null)
@@ -383,6 +392,11 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
     testData.setField(11, 3.toByte)
     testData.setField(12, 3.toShort)
     testData.setField(13, 3.toFloat)
+    testData.setField(14, Row.of(
+      12.asInstanceOf[Integer],
+      true.asInstanceOf[JBoolean],
+      Row.of(1.asInstanceOf[Integer], 2.asInstanceOf[Integer], 3.asInstanceOf[Integer]))
+    )
     testData
   }
 
@@ -401,7 +415,8 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
       BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO,
       Types.BYTE,
       Types.SHORT,
-      Types.FLOAT
+      Types.FLOAT,
+      Types.ROW(Types.INT, Types.BOOLEAN, Types.ROW(Types.INT, Types.INT, Types.INT))
     ).asInstanceOf[TypeInformation[Any]]
   }
 
@@ -427,6 +442,7 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
     "Func15" -> Func15,
     "Func16" -> Func16,
     "Func17" -> Func17,
+    "Func19" -> Func19,
     "JavaFunc0" -> new JavaFunc0,
     "JavaFunc1" -> new JavaFunc1,
     "JavaFunc2" -> new JavaFunc2,

http://git-wip-us.apache.org/repos/asf/flink/blob/54eeccfe/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
index 9535cdf..3f6ebbd 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
@@ -22,7 +22,8 @@ import java.sql.{Date, Time, Timestamp}
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api.Types
-import org.apache.flink.table.functions.{ScalarFunction, FunctionContext}
+import org.apache.flink.table.functions.{FunctionContext, ScalarFunction}
+import org.apache.flink.types.Row
 import org.junit.Assert
 
 import scala.annotation.varargs
@@ -274,3 +275,16 @@ object Func18 extends ScalarFunction {
     str.startsWith(prefix)
   }
 }
+
+object Func19 extends ScalarFunction {
+  def eval(row: Row): Row = {
+    row
+  }
+
+  override def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] =
+    Array(Types.ROW(Types.INT, Types.BOOLEAN, Types.ROW(Types.INT, Types.INT, Types.INT)))
+
+  override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
+    Types.ROW(Types.INT, Types.BOOLEAN, Types.ROW(Types.INT, Types.INT, Types.INT))
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/54eeccfe/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
index 79243dd..828a9e2 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
@@ -22,13 +22,12 @@ import java.sql.{Date, Timestamp}
 
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.table.api.{TableEnvironment, TableException, Types, ValidationException}
-import org.apache.flink.table.runtime.utils.JavaUserDefinedTableFunctions.JavaTableFunc0
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.expressions.utils.{Func1, Func13, Func18, RichFunc2}
-import org.apache.flink.table.runtime.utils.TableProgramsClusterTestBase
+import org.apache.flink.table.api.{TableEnvironment, Types, ValidationException}
+import org.apache.flink.table.expressions.utils.{Func1, Func18, RichFunc2}
+import org.apache.flink.table.runtime.utils.JavaUserDefinedTableFunctions.JavaTableFunc0
 import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.table.runtime.utils._
+import org.apache.flink.table.runtime.utils.{TableProgramsClusterTestBase, _}
 import org.apache.flink.table.utils._
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils

http://git-wip-us.apache.org/repos/asf/flink/blob/54eeccfe/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala
index 79f3f58..215526d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala
@@ -17,14 +17,15 @@
  */
 package org.apache.flink.table.runtime.stream.table
 
+import java.lang.{Boolean => JBoolean}
+
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.api.{TableEnvironment, ValidationException}
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableEnvironment, Types, ValidationException}
 import org.apache.flink.table.expressions.utils.{Func18, RichFunc2}
-import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData}
-import org.apache.flink.table.runtime.utils._
+import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, _}
 import org.apache.flink.table.utils._
 import org.apache.flink.types.Row
 import org.junit.Assert._
@@ -231,6 +232,31 @@ class CorrelateITCase extends StreamingMultipleProgramsTestBase {
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  @Test
+  def testRowType(): Unit = {
+    val row = Row.of(
+      12.asInstanceOf[Integer],
+      true.asInstanceOf[JBoolean],
+      Row.of(1.asInstanceOf[Integer], 2.asInstanceOf[Integer], 3.asInstanceOf[Integer])
+    )
+
+    val rowType = Types.ROW(Types.INT, Types.BOOLEAN, Types.ROW(Types.INT, Types.INT, Types.INT))
+    val in = env.fromElements(row, row)(rowType).toTable(tEnv).as('a, 'b, 'c)
+
+    val tableFunc5 = new TableFunc5()
+    val result = in
+      .join(tableFunc5('c) as ('f0, 'f1, 'f2))
+      .select('c, 'f2)
+
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "1,2,3,3",
+      "1,2,3,3")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
   private def testData(
       env: StreamExecutionEnvironment)
     : DataStream[(Int, Long, String)] = {
@@ -242,5 +268,4 @@ class CorrelateITCase extends StreamingMultipleProgramsTestBase {
     data.+=((4, 3L, "nosharp"))
     env.fromCollection(data)
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/54eeccfe/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala
index e1af23b..9060db5 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala
@@ -119,6 +119,26 @@ class TableFunc4 extends TableFunction[Row] {
   }
 }
 
+class TableFunc5 extends TableFunction[Row] {
+  def eval(row: Row): Unit = {
+    collect(row)
+  }
+
+  override def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] =
+    Array(Types.ROW(Types.INT, Types.INT, Types.INT))
+
+  override def getResultType: TypeInformation[Row] =
+    Types.ROW(Types.INT, Types.INT, Types.INT)
+
+}
+
+class VarArgsFunc0 extends TableFunction[String] {
+  @varargs
+  def eval(str: String*): Unit = {
+    str.foreach(collect)
+  }
+}
+
 class HierarchyTableFunction extends SplittableTableFunction[Boolean, Integer] {
   def eval(user: String) {
     if (user.contains("#")) {
@@ -215,10 +235,3 @@ class RichTableFunc1 extends TableFunction[String] {
     separator = None
   }
 }
-
-class VarArgsFunc0 extends TableFunction[String] {
-  @varargs
-  def eval(str: String*): Unit = {
-    str.foreach(collect)
-  }
-}