You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by sunjincheng121 <gi...@git.apache.org> on 2017/02/25 15:49:49 UTC

[GitHub] flink pull request #3418: [FLINK-5776][Table API & SQL]Improve XXMapRunner s...

GitHub user sunjincheng121 opened a pull request:

    https://github.com/apache/flink/pull/3418

    [FLINK-5776][Table API & SQL]Improve XXMapRunner support create insta\u2026

    \u2026nce by carrying constructor parameters
    
    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [�] General
      - The pull request references the related JIRA issue ("[FLINK-5776] [Table API & SQL]Improve XXMapRunner support create instance by carrying constructor parameters")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [ ] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [�] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/sunjincheng121/flink FLINK-5776-PR

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3418.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3418
    
----
commit 3339ef8d41643aafacf93b7cb9a2b86251e7b632
Author: \u91d1\u7af9 <ji...@alibaba-inc.com>
Date:   2017-02-21T03:35:35Z

    [FLINK-5776][Table API & SQL]Improve XXMapRunner support create instance by carrying constructor parameters

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3418: [FLINK-5776][Table API & SQL]Improve XXMapRunner s...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3418#discussion_r103226511
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -1562,6 +1561,39 @@ class CodeGenerator(
       }
     
       /**
    +    * Adds a reusable constructor statement with the given parameter types,
    +    * and the member variable's name that stores the variables passed in the constructor.
    +    *
    +    * @param memberVariables The member variable's name that stores the variables passed in
    +    *                        the constructor.
    +    * @param parameterTypes The parameter types to construct the function
    +    */
    +  def addReusableConstructorWithMemberVariables(
    --- End diff --
    
    This method is basically doing the same as `addReusableConstructor` except that it allows to hardcode the names of the member variables. The same functionality (in a safe manner) can be achieved by reusing existing code, IMO.
    
    For instance, the code of the test could be rewritten to:
    
    ```
    val params = generator.addReusableConstructor(classOf[Long])
        val card = params(0)
    
        val body =
          s"""
             |return java.lang.Long.valueOf(in1) + java.lang.Long.valueOf($card);
             """.stripMargin
    ```
    
    We would have to remove the `null` assignment for the initially empty field variable in `addReusableConstructor` though (or check if it is a primitive type).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3418: [FLINK-5776][Table API & SQL]Improve XXMapRunner s...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3418#discussion_r103127459
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/MapRunnerTest.scala ---
    @@ -0,0 +1,156 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.api.common.functions.{MapFunction, FlatMapFunction, RichFlatMapFunction,
    +RichMapFunction}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +import org.apache.flink.table.api.TableConfig
    +import org.apache.flink.table.api.scala.stream.utils.StreamITCase
    +import org.apache.flink.table.runtime.MapRunnerTest.StringSink
    +import org.apache.flink.table.codegen.CodeGenerator
    +import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
    +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
    +
    +import org.junit.Test
    +import org.junit.Assert.{assertEquals, _}
    +
    +import java.util.Collections
    +
    +import scala.collection.mutable
    +import scala.collection.JavaConverters._
    +
    +class MapRunnerTest extends StreamingMultipleProgramsTestBase {
    --- End diff --
    
    Actually, tests in this class will setup a local cluster to execute. So `MapRunnerITCase` maybe better than `MapRunnerTest`. 
    
    IMHO, adding two IT cases to test this functionality is a little heavy. The logic is very simple and most of the code is covered by CorrelateITCase. So maybe we can not add tests for this issue. Or add unit test that code generate a map/flatmap wrapped in the runner, can directly call the runner without setup a cluster. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3418: [FLINK-5776][Table API & SQL]Improve XXMapRunner support ...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on the issue:

    https://github.com/apache/flink/pull/3418
  
    Hi, @wuchong thanks a lot for your review. I had change the test to UnitTest. 
    bests,
    SunJincheng


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3418: [FLINK-5776][Table API & SQL]Improve XXMapRunner s...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3418#discussion_r103126204
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/MapRunnerTest.scala ---
    @@ -0,0 +1,156 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.api.common.functions.{MapFunction, FlatMapFunction, RichFlatMapFunction,
    +RichMapFunction}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +import org.apache.flink.table.api.TableConfig
    +import org.apache.flink.table.api.scala.stream.utils.StreamITCase
    +import org.apache.flink.table.runtime.MapRunnerTest.StringSink
    +import org.apache.flink.table.codegen.CodeGenerator
    +import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
    +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
    +
    +import org.junit.Test
    +import org.junit.Assert.{assertEquals, _}
    +
    +import java.util.Collections
    +
    +import scala.collection.mutable
    +import scala.collection.JavaConverters._
    +
    +class MapRunnerTest extends StreamingMultipleProgramsTestBase {
    +
    +  @Test
    +  def testMapRunnerWithConstructorParameter(): Unit = {
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val ds: DataStream[Long] = env.generateSequence(0, 5)
    +
    +    MapRunnerTest.clear
    +
    +    val body =
    +      s"""
    +         |return java.lang.Long.valueOf(in1) + java.lang.Long.valueOf(cardinal);
    +         """.stripMargin
    +
    +    val generator =
    +      new CodeGenerator(
    +        TableConfig.DEFAULT,
    +        false,
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]])
    +
    +    generator.addReusableConstructorWithParameterNames(Array[String]("cardinal"), classOf[Long])
    +
    +    val genFunction = generator.generateFunction(
    +      "myMapFunction",
    +      classOf[MapFunction[Long, Long]],
    +      body,
    +      BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]])
    +
    +    val data = Array(java.lang.Long.valueOf(10))
    +    val mapRunner =
    +      new MapRunner[Long, Long](
    +        genFunction.name,
    +        genFunction.code,
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
    +        data,
    +        Array[Class[_]](classOf[Long])
    +      ).asInstanceOf[RichMapFunction[Long, Long]]
    +
    +    ds.map(mapRunner).addSink(new StringSink())
    +
    +    env.execute()
    +
    +    val expected = mutable.MutableList("10", "11", "12", "13", "14", "15")
    +    assertEquals(expected.sorted, MapRunnerTest.testResults.sorted)
    +  }
    +
    +  @Test
    +  def testFlatMapRunnerWithConstructorParameter(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val ds: DataStream[Long] = env.generateSequence(0, 5)
    +
    +    MapRunnerTest.clear
    +
    +    val body =
    +      s"""
    +         | c.collect(java.lang.Long.valueOf(in1));
    +         | c.collect(java.lang.Long.valueOf(cardinal));
    +         """.stripMargin
    +
    +    val generator =
    +      new CodeGenerator(
    +        TableConfig.DEFAULT,
    +        false,
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]])
    +
    +    generator.addReusableConstructorWithParameterNames(Array[String]("cardinal"), classOf[Long])
    +
    +    val genFunction = generator.generateFunction(
    +      "myFlatMapFunction",
    +      classOf[FlatMapFunction[Long, Long]],
    +      body,
    +      BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]])
    +
    +    val data = Array(java.lang.Long.valueOf(10))
    +    val mapRunner =
    +      new FlatMapRunner[Long, Long](
    +        genFunction.name,
    +        genFunction.code,
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
    +        data,
    +        Array[Class[_]](classOf[Long])
    +      ).asInstanceOf[RichFlatMapFunction[Long, Long]]
    +
    +    ds.flatMap(mapRunner).addSink(new StringSink())
    +
    +    env.execute()
    +
    +    val expected =
    +      mutable.MutableList("0", "1", "2", "3", "4", "5", "10", "10", "10", "10", "10", "10")
    +    assertEquals(expected.sorted, MapRunnerTest.testResults.sorted)
    +  }
    +
    +}
    +
    +object MapRunnerTest {
    --- End diff --
    
    Can we use `StreamITCase` instead of this? It seems that this is the same as `StreamITCase`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3418: [FLINK-5776][Table API & SQL]Improve XXMapRunner support ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/3418
  
    The changes look good to me. However, I also think that such changes should be part of the PR that actually solve a problem. This PR only adds logic without a need for it yet.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3418: [FLINK-5776][Table API & SQL]Improve XXMapRunner s...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3418#discussion_r103127935
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala ---
    @@ -43,7 +45,15 @@ class FlatMapRunner[IN, OUT](
         LOG.debug(s"Compiling FlatMapFunction: $name \n\n Code:\n$code")
         val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
         LOG.debug("Instantiating FlatMapFunction.")
    -    function = clazz.newInstance()
    +    if (null == parameterValues || null == parameterTypes ||
    +      parameterValues.length == 0 || parameterTypes.length == 0 ||
    +      parameterTypes.length != parameterValues.length) {
    --- End diff --
    
    I think we do not need to check `parameterValues.length == 0`, if the parameter length is 0, it also can be handled correctly by the `else` brach. And also, I think we should not to check `parameterTypes.length != parameterValues.length`, in this case, a compile exception thrown would be better rather than ignoring the parameters and using the 0-parameter constructor.
    
    So I think `null == parameterValues || null == parameterTypes` is enough and simpler. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3418: [FLINK-5776][Table API & SQL]Improve XXMapRunner s...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3418#discussion_r103631417
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala ---
    @@ -30,7 +30,9 @@ import org.slf4j.LoggerFactory
     class FlatMapRunner[IN, OUT](
         name: String,
         code: String,
    -    @transient returnType: TypeInformation[OUT])
    +    @transient returnType: TypeInformation[OUT],
    +    parameterValues: Array[_ <: Object] = null,
    --- End diff --
    
    Isn't Object too general? I think it needs to be `Serializable`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3418: [FLINK-5776][Table API & SQL]Improve XXMapRunner s...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3418#discussion_r103127937
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala ---
    @@ -43,7 +45,15 @@ class FlatMapRunner[IN, OUT](
         LOG.debug(s"Compiling FlatMapFunction: $name \n\n Code:\n$code")
         val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
         LOG.debug("Instantiating FlatMapFunction.")
    -    function = clazz.newInstance()
    +    if (null == parameterValues || null == parameterTypes ||
    +      parameterValues.length == 0 || parameterTypes.length == 0 ||
    +      parameterTypes.length != parameterValues.length) {
    --- End diff --
    
    I think we do not need to check `parameterValues.length == 0`, if the parameter length is 0, it also can be handled correctly by the `else` brach. And also, I think we should not to check `parameterTypes.length != parameterValues.length`, in this case, a compile exception thrown would be better rather than ignoring the parameters and using the 0-parameter constructor.
    
    So I think `null == parameterValues || null == parameterTypes` is enough and simpler. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3418: [FLINK-5776][Table API & SQL]Improve XXMapRunner support ...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on the issue:

    https://github.com/apache/flink/pull/3418
  
    Thanks for updating, the code looks good to me. 
    
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3418: [FLINK-5776][Table API & SQL]Improve XXMapRunner support ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3418
  
    Hmm, I see. I'm usually not in favor of adding code without a clear need. 
    On the other hand, these changes are not very extensive and I can see how it might help to code-gen the aggregation functions.
    
    @twalthr what do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3418: [FLINK-5776][Table API & SQL]Improve XXMapRunner s...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3418#discussion_r103206923
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala ---
    @@ -41,7 +43,13 @@ class MapRunner[IN, OUT](
         LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code")
         val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
         LOG.debug("Instantiating MapFunction.")
    -    function = clazz.newInstance()
    +    if (null == parameterValues || null == parameterTypes) {
    --- End diff --
    
    Same as `FlatMapRunner`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3418: [FLINK-5776][Table API & SQL]Improve XXMapRunner s...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 closed the pull request at:

    https://github.com/apache/flink/pull/3418


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3418: [FLINK-5776][Table API & SQL]Improve XXMapRunner s...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3418#discussion_r103206651
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala ---
    @@ -43,7 +45,13 @@ class FlatMapRunner[IN, OUT](
         LOG.debug(s"Compiling FlatMapFunction: $name \n\n Code:\n$code")
         val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
         LOG.debug("Instantiating FlatMapFunction.")
    -    function = clazz.newInstance()
    +    if (null == parameterValues || null == parameterTypes) {
    --- End diff --
    
    move `function =` out of the `if` condition, i.e.,
    ```
    function = if (null == parameterValues || null == parameterTypes) {
          clazz.newInstance()
        } else {
          clazz
            .getConstructor(parameterTypes: _*)
            .newInstance(parameterValues: _*)
        }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3418: [FLINK-5776][Table API & SQL]Improve XXMapRunner support ...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on the issue:

    https://github.com/apache/flink/pull/3418
  
    @fhueske thanks for your review. I have updated the PR.
    IMO. The main purpose of doing this PR is to enhance the function of Flat/MapRunner. In addition, my next plan is:
    Use CODE-GEN to generate the class which below the `org.apache.flink.table.runtime.aggregate` package, perhaps this PR will help me in the next work. What do you think?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---