You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by twalthr <gi...@git.apache.org> on 2017/09/26 11:36:23 UTC

[GitHub] flink pull request #4726: [FLINK-7678] [table] Support composite inputs for ...

GitHub user twalthr opened a pull request:

    https://github.com/apache/flink/pull/4726

    [FLINK-7678] [table] Support composite inputs for user-defined functions

    ## What is the purpose of the change
    
    This PR adds tests for rows as inputs for TableFunctions and ScalarFunctions.
    
    ## Brief change log
    
    Tests added
    
    
    ## Verifying this change
    
    This change added tests.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): no
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
      - The serializers: no
      - The runtime per-record code paths (performance sensitive): no
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
    
    ## Documentation
    
      - Does this pull request introduce a new feature? no
      - If yes, how is the feature documented? not applicable
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/twalthr/flink FLINK-7678

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4726.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4726
    
----
commit 2ad9bab75e7e61120b4e5fee4141fd6ffc59d75d
Author: twalthr <tw...@apache.org>
Date:   2017-09-26T10:10:33Z

    [FLINK-7678] [table] Support composite inputs for user-defined functions

----


---

[GitHub] flink pull request #4726: [FLINK-7678] [table] Support composite inputs for ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4726#discussion_r150540850
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala ---
    @@ -89,6 +90,14 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
           "Nullable(f0)",
           "Nullable(f0)",
           "42")
    +
    +    // test row type input
    +    testAllApis(
    +      Func19('f11),
    +      "Func19(f11)",
    +      "Func19(f11)",
    +      "12,true,1,2,3"
    --- End diff --
    
    I see, that makes sense. 
    
    I would be nice to be able to distinguish the nested and flattened case. 
    But I that would exceed the scope of this PR.


---

[GitHub] flink pull request #4726: [FLINK-7678] [table] Support composite inputs for ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4726


---

[GitHub] flink issue #4726: [FLINK-7678] [table] Support composite inputs for user-de...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/4726
  
    Merging...


---

[GitHub] flink pull request #4726: [FLINK-7678] [table] Support composite inputs for ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4726#discussion_r150539995
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala ---
    @@ -210,6 +213,31 @@ class CorrelateITCase extends StreamingMultipleProgramsTestBase {
         assertEquals(expected.sorted, StreamITCase.testResults.sorted)
       }
     
    +  @Test
    +  def testRowType(): Unit = {
    +    val row = Row.of(
    +      12.asInstanceOf[Integer],
    +      true.asInstanceOf[JBoolean],
    +      Row.of(1.asInstanceOf[Integer], 2.asInstanceOf[Integer], 3.asInstanceOf[Integer])
    +    )
    +
    +    val rowType = Types.ROW(Types.INT, Types.BOOLEAN, Types.ROW(Types.INT, Types.INT, Types.INT))
    +    val in = env.fromElements(row, row)(rowType).toTable(tEnv).as('a, 'b, 'c)
    +
    +    val tableFunc4 = new TableFunc4()
    +    val result = in
    +      .join(tableFunc4('c) as ('f0, 'f1, 'f2))
    +      .select('c, 'f2)
    +
    +    result.addSink(new StreamITCase.StringSink[Row])
    +    env.execute()
    +
    +    val expected = mutable.MutableList(
    +      "1,2,3,3",
    --- End diff --
    
    Again, it remains nested. This is just the `toString` method of row.


---

[GitHub] flink pull request #4726: [FLINK-7678] [table] Support composite inputs for ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4726#discussion_r150539194
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala ---
    @@ -89,6 +90,14 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
           "Nullable(f0)",
           "Nullable(f0)",
           "42")
    +
    +    // test row type input
    +    testAllApis(
    +      Func19('f11),
    +      "Func19(f11)",
    +      "Func19(f11)",
    +      "12,true,1,2,3"
    --- End diff --
    
    No flattening is happening here. It calls the `toString` method of `Row`. If it would be flattened, then only the first field would be returned by the test base.


---

[GitHub] flink pull request #4726: [FLINK-7678] [table] Support composite inputs for ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4726#discussion_r142237447
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala ---
    @@ -22,7 +22,7 @@ import java.sql.{Date, Timestamp}
     
     import org.apache.flink.api.scala._
     import org.apache.flink.api.scala.util.CollectionDataSets
    -import org.apache.flink.table.api.TableEnvironment
    +import org.apache.flink.table.api.{TableEnvironment, Types}
    --- End diff --
    
    Undo?


---

[GitHub] flink pull request #4726: [FLINK-7678] [table] Support composite inputs for ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4726#discussion_r142238899
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala ---
    @@ -89,6 +90,14 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
           "Nullable(f0)",
           "Nullable(f0)",
           "42")
    +
    +    // test row type input
    +    testAllApis(
    +      Func19('f11),
    +      "Func19(f11)",
    +      "Func19(f11)",
    +      "12,true,1,2,3"
    --- End diff --
    
    is the result of a scalar function automatically flattened?


---

[GitHub] flink pull request #4726: [FLINK-7678] [table] Support composite inputs for ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4726#discussion_r142238516
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala ---
    @@ -210,6 +213,31 @@ class CorrelateITCase extends StreamingMultipleProgramsTestBase {
         assertEquals(expected.sorted, StreamITCase.testResults.sorted)
       }
     
    +  @Test
    +  def testRowType(): Unit = {
    +    val row = Row.of(
    +      12.asInstanceOf[Integer],
    +      true.asInstanceOf[JBoolean],
    +      Row.of(1.asInstanceOf[Integer], 2.asInstanceOf[Integer], 3.asInstanceOf[Integer])
    +    )
    +
    +    val rowType = Types.ROW(Types.INT, Types.BOOLEAN, Types.ROW(Types.INT, Types.INT, Types.INT))
    +    val in = env.fromElements(row, row)(rowType).toTable(tEnv).as('a, 'b, 'c)
    +
    +    val tableFunc4 = new TableFunc4()
    +    val result = in
    +      .join(tableFunc4('c) as ('f0, 'f1, 'f2))
    +      .select('c, 'f2)
    +
    +    result.addSink(new StreamITCase.StringSink[Row])
    +    env.execute()
    +
    +    val expected = mutable.MutableList(
    +      "1,2,3,3",
    --- End diff --
    
    is this the correct result? Shouldn't `'c` remain nested? We did not ask to flatten it.


---