You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by twalthr <gi...@git.apache.org> on 2017/09/26 11:36:23 UTC
[GitHub] flink pull request #4726: [FLINK-7678] [table] Support composite inputs for ...
GitHub user twalthr opened a pull request:
https://github.com/apache/flink/pull/4726
[FLINK-7678] [table] Support composite inputs for user-defined functions
## What is the purpose of the change
This PR adds tests for rows as inputs for TableFunctions and ScalarFunctions.
## Brief change log
Tests added
## Verifying this change
This change added tests.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
- The serializers: no
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
## Documentation
- Does this pull request introduce a new feature? no
- If yes, how is the feature documented? not applicable
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/twalthr/flink FLINK-7678
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/4726.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #4726
----
commit 2ad9bab75e7e61120b4e5fee4141fd6ffc59d75d
Author: twalthr <tw...@apache.org>
Date: 2017-09-26T10:10:33Z
[FLINK-7678] [table] Support composite inputs for user-defined functions
----
---
[GitHub] flink pull request #4726: [FLINK-7678] [table] Support composite inputs for ...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/4726#discussion_r150540850
--- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala ---
@@ -89,6 +90,14 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
"Nullable(f0)",
"Nullable(f0)",
"42")
+
+ // test row type input
+ testAllApis(
+ Func19('f11),
+ "Func19(f11)",
+ "Func19(f11)",
+ "12,true,1,2,3"
--- End diff --
I see, that makes sense.
I would be nice to be able to distinguish the nested and flattened case.
But I that would exceed the scope of this PR.
---
[GitHub] flink pull request #4726: [FLINK-7678] [table] Support composite inputs for ...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/4726
---
[GitHub] flink issue #4726: [FLINK-7678] [table] Support composite inputs for user-de...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:
https://github.com/apache/flink/pull/4726
Merging...
---
[GitHub] flink pull request #4726: [FLINK-7678] [table] Support composite inputs for ...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/4726#discussion_r150539995
--- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala ---
@@ -210,6 +213,31 @@ class CorrelateITCase extends StreamingMultipleProgramsTestBase {
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
+ @Test
+ def testRowType(): Unit = {
+ val row = Row.of(
+ 12.asInstanceOf[Integer],
+ true.asInstanceOf[JBoolean],
+ Row.of(1.asInstanceOf[Integer], 2.asInstanceOf[Integer], 3.asInstanceOf[Integer])
+ )
+
+ val rowType = Types.ROW(Types.INT, Types.BOOLEAN, Types.ROW(Types.INT, Types.INT, Types.INT))
+ val in = env.fromElements(row, row)(rowType).toTable(tEnv).as('a, 'b, 'c)
+
+ val tableFunc4 = new TableFunc4()
+ val result = in
+ .join(tableFunc4('c) as ('f0, 'f1, 'f2))
+ .select('c, 'f2)
+
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "1,2,3,3",
--- End diff --
Again, it remains nested. This is just the `toString` method of row.
---
[GitHub] flink pull request #4726: [FLINK-7678] [table] Support composite inputs for ...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/4726#discussion_r150539194
--- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala ---
@@ -89,6 +90,14 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
"Nullable(f0)",
"Nullable(f0)",
"42")
+
+ // test row type input
+ testAllApis(
+ Func19('f11),
+ "Func19(f11)",
+ "Func19(f11)",
+ "12,true,1,2,3"
--- End diff --
No flattening is happening here. It calls the `toString` method of `Row`. If it would be flattened, then only the first field would be returned by the test base.
---
[GitHub] flink pull request #4726: [FLINK-7678] [table] Support composite inputs for ...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/4726#discussion_r142237447
--- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala ---
@@ -22,7 +22,7 @@ import java.sql.{Date, Timestamp}
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.{TableEnvironment, Types}
--- End diff --
Undo?
---
[GitHub] flink pull request #4726: [FLINK-7678] [table] Support composite inputs for ...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/4726#discussion_r142238899
--- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala ---
@@ -89,6 +90,14 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
"Nullable(f0)",
"Nullable(f0)",
"42")
+
+ // test row type input
+ testAllApis(
+ Func19('f11),
+ "Func19(f11)",
+ "Func19(f11)",
+ "12,true,1,2,3"
--- End diff --
is the result of a scalar function automatically flattened?
---
[GitHub] flink pull request #4726: [FLINK-7678] [table] Support composite inputs for ...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/4726#discussion_r142238516
--- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala ---
@@ -210,6 +213,31 @@ class CorrelateITCase extends StreamingMultipleProgramsTestBase {
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
+ @Test
+ def testRowType(): Unit = {
+ val row = Row.of(
+ 12.asInstanceOf[Integer],
+ true.asInstanceOf[JBoolean],
+ Row.of(1.asInstanceOf[Integer], 2.asInstanceOf[Integer], 3.asInstanceOf[Integer])
+ )
+
+ val rowType = Types.ROW(Types.INT, Types.BOOLEAN, Types.ROW(Types.INT, Types.INT, Types.INT))
+ val in = env.fromElements(row, row)(rowType).toTable(tEnv).as('a, 'b, 'c)
+
+ val tableFunc4 = new TableFunc4()
+ val result = in
+ .join(tableFunc4('c) as ('f0, 'f1, 'f2))
+ .select('c, 'f2)
+
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "1,2,3,3",
--- End diff --
is this the correct result? Shouldn't `'c` remain nested? We did not ask to flatten it.
---