You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by sunjincheng121 <gi...@git.apache.org> on 2017/02/16 03:50:19 UTC

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

GitHub user sunjincheng121 opened a pull request:

    https://github.com/apache/flink/pull/3330

    [FLINK-5795][TableAPI&SQL] Improve UDTF to support constructor with p\u2026

    \u2026arameter.
    
    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [�] General
      - The pull request references the related JIRA issue ("[FLINK-5795] Improve \u201cUDTF" to support constructor with parameter.")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [ ] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [�] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/sunjincheng121/flink FLINK-5795-PR

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3330.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3330
    
----
commit da788d627527f7a0547c23d50ba916f625b66aeb
Author: \u91d1\u7af9 <ji...@alibaba-inc.com>
Date:   2017-02-14T06:43:41Z

    [FLINK-5795][TableAPI&SQL] Improve UDTF to support constructor with parameter.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support constr...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on the issue:

    https://github.com/apache/flink/pull/3330
  
    Yes @fhueske , I would like to help merge it. Is there anything I need to pay attention to ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support constr...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on the issue:

    https://github.com/apache/flink/pull/3330
  
    Looks good to me. Wait for another committer +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r101445785
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala ---
    @@ -324,4 +325,19 @@ object UserDefinedFunctionUtils {
         candidate == classOf[Time] && (expected == classOf[Int] || expected == classOf[JInt]) ||
         candidate == classOf[Timestamp] && (expected == classOf[Long] || expected == classOf[JLong])
     
    +  @throws[Exception]
    +  def serialize(function: UserDefinedFunction): String = {
    +    val byteArrayOutPut = new ByteArrayOutputStream
    +    val objectOutPut = new ObjectOutputStream(byteArrayOutPut)
    +    objectOutPut.writeObject(function)
    +    objectOutPut.close()
    +    Base64.encodeBase64URLSafeString(byteArrayOutPut.toByteArray)
    +  }
    +
    +  @throws[Exception]
    +  def deserialize(data: String): UserDefinedFunction = {
    +    val byteData = Base64.decodeBase64(data)
    +    new ObjectInputStream(
    --- End diff --
    
    You can get the object from byte array by `InstantiationUtil.deserializeObject`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r101456205
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -1463,21 +1465,23 @@ class CodeGenerator(
         */
       def addReusableFunction(function: UserDefinedFunction): String = {
         val classQualifier = function.getClass.getCanonicalName
    -    val fieldTerm = s"function_${classQualifier.replace('.', '$')}"
    +    val functionSerializedData = serialize(function)
    +    val fieldTerm =
    +      s"""
    +         |function_${classQualifier.replace('.', '$')}_${DigestUtils.md5Hex(functionSerializedData)}
    --- End diff --
    
    Yes, UDTF node is independent of the codegen. It rarely happens in UDTF. But often happens when involving multiple ScalarFucntions (assuming the same way to implement ScalarFunction). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r101763661
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -1463,21 +1465,23 @@ class CodeGenerator(
         */
       def addReusableFunction(function: UserDefinedFunction): String = {
         val classQualifier = function.getClass.getCanonicalName
    -    val fieldTerm = s"function_${classQualifier.replace('.', '$')}"
    +    val functionSerializedData = serialize(function)
    +    val fieldTerm =
    +      s"""
    +         |function_${classQualifier.replace('.', '$')}_${DigestUtils.md5Hex(functionSerializedData)}
    +       """.stripMargin
     
         val fieldFunction =
           s"""
             |transient $classQualifier $fieldTerm = null;
             |""".stripMargin
         reusableMemberStatements.add(fieldFunction)
     
    -    val constructorTerm = s"constructor_${classQualifier.replace('.', '$')}"
         val constructorAccessibility =
    --- End diff --
    
    rename variable to something like `functionDeserialization`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r101450383
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala ---
    @@ -324,4 +325,19 @@ object UserDefinedFunctionUtils {
         candidate == classOf[Time] && (expected == classOf[Int] || expected == classOf[JInt]) ||
         candidate == classOf[Timestamp] && (expected == classOf[Long] || expected == classOf[JLong])
     
    +  @throws[Exception]
    +  def serialize(function: UserDefinedFunction): String = {
    +    val byteArrayOutPut = new ByteArrayOutputStream
    --- End diff --
    
    Cool\uff0cgood catch. It's the same code, but use InstantiationUtil  is a good away.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support constr...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on the issue:

    https://github.com/apache/flink/pull/3330
  
    Thanks @sunjincheng121  for updating. Could you rebase the code on the master?   
    
    After that, I can help to merge this. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support constr...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on the issue:

    https://github.com/apache/flink/pull/3330
  
    @fhueske @wuchong I had add UDF's implementation. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r101906986
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -1463,21 +1465,23 @@ class CodeGenerator(
         */
       def addReusableFunction(function: UserDefinedFunction): String = {
         val classQualifier = function.getClass.getCanonicalName
    -    val fieldTerm = s"function_${classQualifier.replace('.', '$')}"
    +    val functionSerializedData = serialize(function)
    +    val fieldTerm =
    +      s"""
    +         |function_${classQualifier.replace('.', '$')}_${DigestUtils.md5Hex(functionSerializedData)}
    --- End diff --
    
    Thanks @fhueske  
          Regarding the name collisions ,  the current implementation is to meet UDTF and UDF.
    But from `tableAPI` to `SqlFunction` the creation process has some work to do because the current UDF parameters do not take effect.I just make a simple test, did not carefully check the reasons (I would like to consider in the FLINK-5795). Feel free to let me know If you also want merge FLINK-5794 into this PR\u3002:)
    
    Thanks, @wuchong 
    1. md5Hex used for carry the construction parameters of the situation, without md5Hex will produce object coverage, can not produce the correct results.
    2. `CodeGenUtils.newName` not work well, because this method use `AtomicInteger.getAndIncrement` generate name number, when we use multiple UDFs of the same state, it will lead to the creation of multiple UDF objects, and in fact a shared object is sufficient. e.g.
    `
        tEnv.registerFunction("func0", new Func13)
        tEnv.registerFunction("func1", new Func13)
        tEnv.registerFunction("func2", new Func13)
    `
    `reusableMemberStatements ` will contain three different elements, can not do the de-duplicates. and in fact a shared object is sufficient
    
    What do you think @fhueske @wuchong ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/3330


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r102032869
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ScalarFunction.scala ---
    @@ -58,6 +60,11 @@ abstract class ScalarFunction extends UserDefinedFunction {
     
       override def toString: String = getClass.getCanonicalName
     
    +  final def functionIdentifier: String = {
    --- End diff --
    
    Can the `functionIdentifier` to be the default implementation of UserDefinedFunction ?  I think it also works for AggregateFunction in the future. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r102035404
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ScalarFunction.scala ---
    @@ -58,6 +60,11 @@ abstract class ScalarFunction extends UserDefinedFunction {
     
       override def toString: String = getClass.getCanonicalName
     
    +  final def functionIdentifier: String = {
    --- End diff --
    
    In order to keep JAVA's interface and SCALAR's trait consistent, I prefer not to do any implementation in UserDefinedFunction.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support constr...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/3330
  
    One last comment: (Maybe as a followup issue) We should also update the documentation about this change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r101445544
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala ---
    @@ -324,4 +325,19 @@ object UserDefinedFunctionUtils {
         candidate == classOf[Time] && (expected == classOf[Int] || expected == classOf[JInt]) ||
         candidate == classOf[Timestamp] && (expected == classOf[Long] || expected == classOf[JLong])
     
    +  @throws[Exception]
    +  def serialize(function: UserDefinedFunction): String = {
    +    val byteArrayOutPut = new ByteArrayOutputStream
    +    val objectOutPut = new ObjectOutputStream(byteArrayOutPut)
    +    objectOutPut.writeObject(function)
    +    objectOutPut.close()
    +    Base64.encodeBase64URLSafeString(byteArrayOutPut.toByteArray)
    --- End diff --
    
    Dose `Base64.encodeBase64String` sufficient this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support constr...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3330
  
    Thanks for the update @sunjincheng121! 
    +1 to merge.
    @wuchong do you want to merge this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support constr...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on the issue:

    https://github.com/apache/flink/pull/3330
  
    thanks @wuchong , I'll rebase the code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r101775264
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -1463,21 +1465,23 @@ class CodeGenerator(
         */
       def addReusableFunction(function: UserDefinedFunction): String = {
         val classQualifier = function.getClass.getCanonicalName
    -    val fieldTerm = s"function_${classQualifier.replace('.', '$')}"
    +    val functionSerializedData = serialize(function)
    +    val fieldTerm =
    +      s"""
    +         |function_${classQualifier.replace('.', '$')}_${DigestUtils.md5Hex(functionSerializedData)}
    --- End diff --
    
    I think encoding the serialized object as String is a good idea. I like the implementation. 
    
    Regarding the name collisions. I did a test with two identical scalar functions and everything worked well because `reusableMemberStatements` is a hash set that deduplicates the terms.
    
    Btw. what else do we need to do for scalar UDFs? They are injected with the same method and hence should be serialized as well. Isn't the only missing thing to add more tests? If yes, we could do that in this PR. 
    
    What do you think @sunjincheng121 and @wuchong?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r101765235
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala ---
    @@ -0,0 +1,75 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.dataset
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.types.Row
    +import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
    +import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.table.api.TableEnvironment
    +import org.apache.flink.table.utils._
    +import org.apache.flink.test.util.TestBaseUtils
    +import org.junit.Test
    +import org.junit.runner.RunWith
    +import org.junit.runners.Parameterized
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +
    +@RunWith(classOf[Parameterized])
    +class DataSetUserDefinedFunctionITCase (
    --- End diff --
    
    The problem with many `TableProgramsClusterTestBase` ITCases classes is that each class adds significant testing overhead. For each class, a Flink testing cluster is started which is shared across all tests in the file. So having a single ITCase with many tests is less of a problem than having many classes with one test method each.
    
    I think it makes sense to move this test to `DataSetCorrelateITCase`. Alternatively, we can extend `TableProgramsCollectionTestBase` which does not start a cluster but runs the program on Java collections. That should be fine for this test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r101450405
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -1463,21 +1465,23 @@ class CodeGenerator(
         */
       def addReusableFunction(function: UserDefinedFunction): String = {
         val classQualifier = function.getClass.getCanonicalName
    -    val fieldTerm = s"function_${classQualifier.replace('.', '$')}"
    +    val functionSerializedData = serialize(function)
    +    val fieldTerm =
    +      s"""
    +         |function_${classQualifier.replace('.', '$')}_${DigestUtils.md5Hex(functionSerializedData)}
    --- End diff --
    
    It seems that if we register two identical function (not the same reference), the field term will the same, which will compile error here. Such as:
    
    ```
    val func1 = new MyFunction
    val func2 = new MyFunction
    env.register("func1", func1)
    env.register("func2", func2)
    ```
    
    The func1 and func2 is two different object but the serialized bytes are equals.  
    
    BTW, I will suggest to include this case into your new IT cases. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r101447054
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala ---
    @@ -324,4 +325,19 @@ object UserDefinedFunctionUtils {
         candidate == classOf[Time] && (expected == classOf[Int] || expected == classOf[JInt]) ||
         candidate == classOf[Timestamp] && (expected == classOf[Long] || expected == classOf[JLong])
     
    +  @throws[Exception]
    +  def serialize(function: UserDefinedFunction): String = {
    +    val byteArrayOutPut = new ByteArrayOutputStream
    +    val objectOutPut = new ObjectOutputStream(byteArrayOutPut)
    +    objectOutPut.writeObject(function)
    +    objectOutPut.close()
    +    Base64.encodeBase64URLSafeString(byteArrayOutPut.toByteArray)
    --- End diff --
    
    No. we can not use that method. because:
    encodeBase64String  will `Encodes binary data using the base64 algorithm into 76 character blocks separated by CRLF.` So\uff0cif the binary data more than 76 character, we'll got `CRLF` which we do not want.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support constr...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3330
  
    @wuchong just sent you mail with the steps I take to merge a PR. :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support constr...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on the issue:

    https://github.com/apache/flink/pull/3330
  
    HI @twalthr, Thanks for the reminder, I'll update document in FLINK-5794.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r101460153
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -1463,21 +1465,23 @@ class CodeGenerator(
         */
       def addReusableFunction(function: UserDefinedFunction): String = {
         val classQualifier = function.getClass.getCanonicalName
    -    val fieldTerm = s"function_${classQualifier.replace('.', '$')}"
    +    val functionSerializedData = serialize(function)
    +    val fieldTerm =
    +      s"""
    +         |function_${classQualifier.replace('.', '$')}_${DigestUtils.md5Hex(functionSerializedData)}
    --- End diff --
    
    I think it makes sense to keep the same implementation for UDF and UDTF. What about code generate a byte[] field which hold the serialized function (proposed by Fabian)? 
    
    >I see two options:
    >1. make the UDF a member of wrapping function. It might be a bit tricky to pass the reference into the code-gen'd function.
    >2. add a final byte[] field into the code-gen'd function that holds the serialized UDF object and deserialize during initialization. This will blow up the code-gen'd string but might work well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r101759328
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala ---
    @@ -22,7 +22,9 @@ package org.apache.flink.table.functions.utils
     import java.lang.{Long => JLong, Integer => JInt}
     import java.lang.reflect.{Method, Modifier}
     import java.sql.{Date, Time, Timestamp}
    +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
    --- End diff --
    
    some unused imports.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support constr...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on the issue:

    https://github.com/apache/flink/pull/3330
  
    Thank your for the steps. 
    
    merging...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r101446217
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala ---
    @@ -0,0 +1,80 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.datastream
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.types.Row
    +import org.apache.flink.table.api.scala.stream.utils.StreamITCase
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.table.utils.TableFunc3
    +import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
    +import org.apache.flink.table.api.TableEnvironment
    +import org.junit.Assert._
    +import org.junit.Test
    +
    +import scala.collection.mutable
    +
    +class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestBase {
    --- End diff --
    
    This class can go into `org.apache.flink.table.runtime.datastream.DataStreamCorrelateITCase`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r101726090
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala ---
    @@ -0,0 +1,75 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.dataset
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.types.Row
    +import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
    +import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.table.api.TableEnvironment
    +import org.apache.flink.table.utils._
    +import org.apache.flink.test.util.TestBaseUtils
    +import org.junit.Test
    +import org.junit.runner.RunWith
    +import org.junit.runners.Parameterized
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +
    +@RunWith(classOf[Parameterized])
    +class DataSetUserDefinedFunctionITCase (
    --- End diff --
    
    @wuchong, maybe you are right.  But I'm not sure. I think @fhueske can give us good advice\uff0cWhat do you think @fhueske.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r101762354
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -1463,21 +1465,23 @@ class CodeGenerator(
         */
       def addReusableFunction(function: UserDefinedFunction): String = {
         val classQualifier = function.getClass.getCanonicalName
    -    val fieldTerm = s"function_${classQualifier.replace('.', '$')}"
    +    val functionSerializedData = serialize(function)
    +    val fieldTerm =
    +      s"""
    +         |function_${classQualifier.replace('.', '$')}_${DigestUtils.md5Hex(functionSerializedData)}
    +       """.stripMargin
     
         val fieldFunction =
           s"""
             |transient $classQualifier $fieldTerm = null;
             |""".stripMargin
         reusableMemberStatements.add(fieldFunction)
     
    -    val constructorTerm = s"constructor_${classQualifier.replace('.', '$')}"
         val constructorAccessibility =
           s"""
    -        |java.lang.reflect.Constructor $constructorTerm =
    -        |  $classQualifier.class.getDeclaredConstructor();
    -        |$constructorTerm.setAccessible(true);
    -        |$fieldTerm = ($classQualifier) $constructorTerm.newInstance();
    +         |$fieldTerm = ($classQualifier)
    +         |org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
    --- End diff --
    
    replace hardcoded class name by `${UserDefinedFunctionUtils.getClass.getName.stripSuffix("$")}`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r101943644
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala ---
    @@ -0,0 +1,75 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.dataset
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.types.Row
    +import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
    +import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.table.api.TableEnvironment
    +import org.apache.flink.table.utils._
    +import org.apache.flink.test.util.TestBaseUtils
    +import org.junit.Test
    +import org.junit.runner.RunWith
    +import org.junit.runners.Parameterized
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +
    +@RunWith(classOf[Parameterized])
    +class DataSetUserDefinedFunctionITCase (
    --- End diff --
    
    We also need to add the UDF\u2019s test , UDF\u2018s test add to the `DataSetCorrelateITCase` is not appropriate. So I think extend  `TableProgramsCollectionTestBase` is good way .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r101975970
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala ---
    @@ -0,0 +1,75 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.dataset
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.types.Row
    +import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
    +import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.table.api.TableEnvironment
    +import org.apache.flink.table.utils._
    +import org.apache.flink.test.util.TestBaseUtils
    +import org.junit.Test
    +import org.junit.runner.RunWith
    +import org.junit.runners.Parameterized
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +
    +@RunWith(classOf[Parameterized])
    +class DataSetUserDefinedFunctionITCase (
    --- End diff --
    
    Yes. rename the CorrelateITCases to UserDefinedFunctionsITCases make sense for me. Because we can add UDF/UDTF/UDAF tests in one ITCase.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r101461418
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -1463,21 +1465,23 @@ class CodeGenerator(
         */
       def addReusableFunction(function: UserDefinedFunction): String = {
         val classQualifier = function.getClass.getCanonicalName
    -    val fieldTerm = s"function_${classQualifier.replace('.', '$')}"
    +    val functionSerializedData = serialize(function)
    +    val fieldTerm =
    +      s"""
    +         |function_${classQualifier.replace('.', '$')}_${DigestUtils.md5Hex(functionSerializedData)}
    --- End diff --
    
    Cool, we can talk about it, outer this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r101944209
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala ---
    @@ -0,0 +1,80 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.datastream
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.types.Row
    +import org.apache.flink.table.api.scala.stream.utils.StreamITCase
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.table.utils.TableFunc3
    +import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
    +import org.apache.flink.table.api.TableEnvironment
    +import org.junit.Assert._
    +import org.junit.Test
    +
    +import scala.collection.mutable
    +
    +class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestBase {
    --- End diff --
    
    Where do we need to put the UDF test? If we put the UDTF test into DataStreamCorrelateITCase. Do we need to create a separate ITCase? @fhueske @wuchong 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r101974425
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ScalarFunction.scala ---
    @@ -56,8 +58,10 @@ abstract class ScalarFunction extends UserDefinedFunction {
         ScalarFunctionCall(this, params)
       }
     
    -  override def toString: String = getClass.getCanonicalName
    -
    +  override def toString: String = {
    +    val md5  =  DigestUtils.md5Hex(serialize(this))
    --- End diff --
    
    I think this is dangerous. 
    The function might be overridden by a UDF.
    Rather add a new `final` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r101447400
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala ---
    @@ -0,0 +1,75 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.dataset
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.types.Row
    +import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
    +import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.table.api.TableEnvironment
    +import org.apache.flink.table.utils._
    +import org.apache.flink.test.util.TestBaseUtils
    +import org.junit.Test
    +import org.junit.runner.RunWith
    +import org.junit.runners.Parameterized
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +
    +@RunWith(classOf[Parameterized])
    +class DataSetUserDefinedFunctionITCase (
    --- End diff --
    
    In the beginning, I also want to put it into `DataSetCorrelateITCase`, but this class used for `UserDefinedFunction(UDF / UDTF / UDAF)` tests, so I think a single is better, what do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r101448753
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala ---
    @@ -0,0 +1,75 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.dataset
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.types.Row
    +import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
    +import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.table.api.TableEnvironment
    +import org.apache.flink.table.utils._
    +import org.apache.flink.test.util.TestBaseUtils
    +import org.junit.Test
    +import org.junit.runner.RunWith
    +import org.junit.runners.Parameterized
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +
    +@RunWith(classOf[Parameterized])
    +class DataSetUserDefinedFunctionITCase (
    --- End diff --
    
    I'm not sure about that. Most of the time, Scalar Functions can be tested by unit tests (see `ScalarFunctionsTest`), not all IT cases. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r101446163
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala ---
    @@ -0,0 +1,75 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.dataset
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.types.Row
    +import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
    +import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.table.api.TableEnvironment
    +import org.apache.flink.table.utils._
    +import org.apache.flink.test.util.TestBaseUtils
    +import org.junit.Test
    +import org.junit.runner.RunWith
    +import org.junit.runners.Parameterized
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +
    +@RunWith(classOf[Parameterized])
    +class DataSetUserDefinedFunctionITCase (
    --- End diff --
    
    This class can go into `org.apache.flink.table.runtime.dataset.DataSetCorrelateITCase`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r101445690
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala ---
    @@ -324,4 +325,19 @@ object UserDefinedFunctionUtils {
         candidate == classOf[Time] && (expected == classOf[Int] || expected == classOf[JInt]) ||
         candidate == classOf[Timestamp] && (expected == classOf[Long] || expected == classOf[JLong])
     
    +  @throws[Exception]
    +  def serialize(function: UserDefinedFunction): String = {
    +    val byteArrayOutPut = new ByteArrayOutputStream
    --- End diff --
    
    You can get the serialized byte array of an object by `InstantiationUtil.serializeObject`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r101974098
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -1463,21 +1465,23 @@ class CodeGenerator(
         */
       def addReusableFunction(function: UserDefinedFunction): String = {
         val classQualifier = function.getClass.getCanonicalName
    -    val fieldTerm = s"function_${classQualifier.replace('.', '$')}"
    +    val functionSerializedData = serialize(function)
    +    val fieldTerm =
    +      s"""
    +         |function_${classQualifier.replace('.', '$')}_${DigestUtils.md5Hex(functionSerializedData)}
    --- End diff --
    
    I agree with @sunjincheng121.
    The naming collision with md5hex are actually intentional to deduplicate identical functions This might happen because the same function is used more than once or because the same function has been registered twice and both are used. By using `CodeGen.newName` we would have the serialized code and initialization as often as the same function is used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r101446744
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -1463,21 +1465,23 @@ class CodeGenerator(
         */
       def addReusableFunction(function: UserDefinedFunction): String = {
         val classQualifier = function.getClass.getCanonicalName
    -    val fieldTerm = s"function_${classQualifier.replace('.', '$')}"
    +    val functionSerializedData = serialize(function)
    +    val fieldTerm =
    +      s"""
    +         |function_${classQualifier.replace('.', '$')}_${DigestUtils.md5Hex(functionSerializedData)}
    --- End diff --
    
    1. Calculate MD5 to prevent variable name duplication.
    2. Convert Hex to prevent special characters, such as the symbol "=".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r101458924
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -1463,21 +1465,23 @@ class CodeGenerator(
         */
       def addReusableFunction(function: UserDefinedFunction): String = {
         val classQualifier = function.getClass.getCanonicalName
    -    val fieldTerm = s"function_${classQualifier.replace('.', '$')}"
    +    val functionSerializedData = serialize(function)
    +    val fieldTerm =
    +      s"""
    +         |function_${classQualifier.replace('.', '$')}_${DigestUtils.md5Hex(functionSerializedData)}
    --- End diff --
    
    Yes, thanks @wuchong ! UDF and UDTF will be a little different, so we discuss it separately, and in FLINK-5794 I will consider UDF in conjunction with your suggestion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r101760789
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -1463,21 +1465,23 @@ class CodeGenerator(
         */
       def addReusableFunction(function: UserDefinedFunction): String = {
    --- End diff --
    
    By serializing the UDF, we don't need the default constructor anymore, right? So we should update the JavaDocs of the method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r102037328
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ScalarFunction.scala ---
    @@ -58,6 +60,11 @@ abstract class ScalarFunction extends UserDefinedFunction {
     
       override def toString: String = getClass.getCanonicalName
     
    +  final def functionIdentifier: String = {
    --- End diff --
    
    The  `UserDefinedFunction` has been declared as an abstract class in the newest master when we adding `open` and `close` default implementation. I think the `functionIdentifier` can be the same and will not hurt anything.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r101972557
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala ---
    @@ -0,0 +1,75 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.dataset
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.types.Row
    +import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
    +import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.table.api.TableEnvironment
    +import org.apache.flink.table.utils._
    +import org.apache.flink.test.util.TestBaseUtils
    +import org.junit.Test
    +import org.junit.runner.RunWith
    +import org.junit.runners.Parameterized
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +
    +@RunWith(classOf[Parameterized])
    +class DataSetUserDefinedFunctionITCase (
    --- End diff --
    
    I would really like to avoid having too many ITCase classes. Using the CollectionTestBase does only work for batch.
    
    Maybe we can rename the CorrelateITCases to UserDefinedFunctionsITCases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r101455043
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -1463,21 +1465,23 @@ class CodeGenerator(
         */
       def addReusableFunction(function: UserDefinedFunction): String = {
         val classQualifier = function.getClass.getCanonicalName
    -    val fieldTerm = s"function_${classQualifier.replace('.', '$')}"
    +    val functionSerializedData = serialize(function)
    +    val fieldTerm =
    +      s"""
    +         |function_${classQualifier.replace('.', '$')}_${DigestUtils.md5Hex(functionSerializedData)}
    --- End diff --
    
    1. You write the above two functions are stateless, there is no problem when create a instanse .
    2. In fact, we do not have to worried about the variable name, we build a tree, each UDTF node is independent of the codegen.
    So\uff0cwe can keep named the `fieldTerm = function_${classQualifier.replace('.', '$')}`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r101946735
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala ---
    @@ -0,0 +1,80 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.datastream
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.types.Row
    +import org.apache.flink.table.api.scala.stream.utils.StreamITCase
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.table.utils.TableFunc3
    +import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
    +import org.apache.flink.table.api.TableEnvironment
    +import org.junit.Assert._
    +import org.junit.Test
    +
    +import scala.collection.mutable
    +
    +class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestBase {
    --- End diff --
    
    IMO, the scalar UDF tests can be put into `UserDefinedScalarFunctionTest`. Scalar function tests do not need to setup a cluster environment, so an unit test is enough.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r101765703
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala ---
    @@ -0,0 +1,80 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.datastream
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.types.Row
    +import org.apache.flink.table.api.scala.stream.utils.StreamITCase
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.table.utils.TableFunc3
    +import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
    +import org.apache.flink.table.api.TableEnvironment
    +import org.junit.Assert._
    +import org.junit.Test
    +
    +import scala.collection.mutable
    +
    +class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestBase {
    --- End diff --
    
    Streaming integration tests do always start a new Flink cluster (there is no collection mode). 
    So I think moving this to `DataStreamCorrelateITCase` as @wuchong suggested is a good idea.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support constr...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on the issue:

    https://github.com/apache/flink/pull/3330
  
    Hi, @fhueske ,
    Thanks for your review. I have added  functionIdentifier method. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r101942319
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -1463,21 +1465,23 @@ class CodeGenerator(
         */
       def addReusableFunction(function: UserDefinedFunction): String = {
         val classQualifier = function.getClass.getCanonicalName
    -    val fieldTerm = s"function_${classQualifier.replace('.', '$')}"
    +    val functionSerializedData = serialize(function)
    +    val fieldTerm =
    +      s"""
    +         |function_${classQualifier.replace('.', '$')}_${DigestUtils.md5Hex(functionSerializedData)}
    --- End diff --
    
    Hi @wuchong 
    No, scalar UDF in Scala Table API not works well.  
    The reason is that when we create `ScalarSqlFunction`, we apply` scalarFunction.getClass.getCanonicalName` as sql identifier, which produces the wrong result.
    
    Hi, @fhueske  
    So far, we have discussed a lot of UDF implementations in this PR, so I agree with merge FLINK-5794 into this PR.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r101445424
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -1463,21 +1465,23 @@ class CodeGenerator(
         */
       def addReusableFunction(function: UserDefinedFunction): String = {
         val classQualifier = function.getClass.getCanonicalName
    -    val fieldTerm = s"function_${classQualifier.replace('.', '$')}"
    +    val functionSerializedData = serialize(function)
    +    val fieldTerm =
    +      s"""
    +         |function_${classQualifier.replace('.', '$')}_${DigestUtils.md5Hex(functionSerializedData)}
    --- End diff --
    
    Why do we need md5Hex here ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r101907504
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -1463,21 +1465,23 @@ class CodeGenerator(
         */
       def addReusableFunction(function: UserDefinedFunction): String = {
         val classQualifier = function.getClass.getCanonicalName
    -    val fieldTerm = s"function_${classQualifier.replace('.', '$')}"
    +    val functionSerializedData = serialize(function)
    +    val fieldTerm =
    +      s"""
    +         |function_${classQualifier.replace('.', '$')}_${DigestUtils.md5Hex(functionSerializedData)}
    --- End diff --
    
    >But from tableAPI to SqlFunction the creation process has some work to do because the current UDF parameters do not take effect.
    
    Do you mean "from Java Table API to SqlFunction" ?  If yes, the Java Table API (i.e. parse expression from a string) will instantiate a new instance of ScalarFunction when lookup this function call (the line is https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala#L84). TableFunction will reuse the object created by user. You can follow the way of TableFunction to fix this.
    
    BTW, scalar UDF in Scala Table API works good, right? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r101885634
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -1463,21 +1465,23 @@ class CodeGenerator(
         */
       def addReusableFunction(function: UserDefinedFunction): String = {
         val classQualifier = function.getClass.getCanonicalName
    -    val fieldTerm = s"function_${classQualifier.replace('.', '$')}"
    +    val functionSerializedData = serialize(function)
    +    val fieldTerm =
    +      s"""
    +         |function_${classQualifier.replace('.', '$')}_${DigestUtils.md5Hex(functionSerializedData)}
    --- End diff --
    
    I find that the md5Hex string in the fieldTerm is never used. What about using `CodeGenUtils.newName` to generate a new function field name (as shown below). It is a common usage in `CodeGenerator` and there must be no naming collisions and the generated name will be more readable. What do you think @sunjincheng121 @fhueske ?
    
    ```
    CodeGenUtils.newName(s"function_${classQualifier.replace('.', '$')}")
    ```
    
    Regarding to another PR for scalar UDFs, I think you are right. We can that in this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3330#discussion_r101974535
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala ---
    @@ -94,7 +96,10 @@ abstract class TableFunction[T] extends UserDefinedFunction {
         TableFunctionCall(getClass.getSimpleName, this, params, resultType)
       }
     
    -  override def toString: String = getClass.getCanonicalName
    +  override def toString: String = {
    --- End diff --
    
    Same as for `ScalarFunction`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3330: [FLINK-5795][TableAPI&SQL] Improve UDTF to support constr...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on the issue:

    https://github.com/apache/flink/pull/3330
  
    Hi @wuchong , thanks for the review.  I have rebased the code on the master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---