You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by godfreyhe <gi...@git.apache.org> on 2017/01/20 06:46:55 UTC

[GitHub] flink pull request #3176: [FLINK-5571] [table] add open and close methods fo...

GitHub user godfreyhe opened a pull request:

    https://github.com/apache/flink/pull/3176

    [FLINK-5571] [table] add open and close methods for UserDefinedFunction

    Currently, a User Defined Function (UDF) in table API & SQL works on zero, one, or multiple values in custom evaluation method. Many UDFs need more complex features, e.g. report metrics, get parameters from job configuration, or get extra data from distribute cache file, etc. Adding open and close methods in UserDefinedFunction class can solve this problem. 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/godfreyhe/flink udf-open-close

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3176.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3176
    
----
commit 4ac2be7b3ab4dc0e8a18e43c79a031d7a16ee1ea
Author: godfreyhe <go...@163.com>
Date:   2017-01-20T06:42:12Z

    add open and close methods for UserDefinedFunction

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3176: [FLINK-5571] [table] add open and close methods fo...

Posted by godfreyhe <gi...@git.apache.org>.
Github user godfreyhe commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3176#discussion_r100469073
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala ---
    @@ -24,4 +24,19 @@ package org.apache.flink.table.functions
       * User-defined functions must have a default constructor and must be instantiable during runtime.
       */
     trait UserDefinedFunction {
    --- End diff --
    
    yes, It works! please refer to: http://alvinalexander.com/scala/how-to-wrap-scala-traits-used-accessed-java-classes-methods


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3176: [FLINK-5571] [table] add open and close methods fo...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3176#discussion_r100470762
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala ---
    @@ -24,4 +24,19 @@ package org.apache.flink.table.functions
       * User-defined functions must have a default constructor and must be instantiable during runtime.
       */
     trait UserDefinedFunction {
    --- End diff --
    
    Copy from the blog you link:
    >To summarize: If you\u2019re writing a Scala API that will be used by Java clients, don\u2019t expose traits that have implemented behavior in your public API. If you have traits like that, wrap them in a class for your Java consumers.
    
    The wrapper class mentioned in the blog is a concrete class defined in Scala and not an abstract class (like `ScalarFunction`). And if we extend this class in Java, the same error will be thrown. I think define `UserDefinedFunction` as abstract can easily fix this problem and no side effects. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3176: [FLINK-5571] [table] add open and close methods fo...

Posted by godfreyhe <gi...@git.apache.org>.
Github user godfreyhe commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3176#discussion_r99532146
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -220,56 +247,105 @@ class CodeGenerator(
         // manual casting here
         val samHeader =
           // FlatMapFunction
    -      if (clazz == classOf[FlatMapFunction[_,_]]) {
    +      if (clazz == classOf[FlatMapFunction[_, _]]) {
    --- End diff --
    
    We can not choose `FlatMapFunction` or `RichFlatMapFunction` in `translateToPlan` method, unless We know whether the current rexNode contains `UserDefinedFunction`s. However CodeGenerator is a kind of RexVisitor and know `UserDefinedFunction` when `generateExpression` called. 
    Besides, `RichFlatMapFunction` implements `FlatMapFunction`, so It's reasonable that Users call the generateFunction(desc, classOf[FlatMapFunction[Any, Any]], body, returnType) and returned a RichFlatMapFunction.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3176: [FLINK-5571] [table] add open and close methods fo...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3176#discussion_r101246957
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -122,6 +123,18 @@ class CodeGenerator(
       // we use a LinkedHashSet to keep the insertion order
       private val reusableInitStatements = mutable.LinkedHashSet[String]()
     
    +  // generate RichFunction(e.g. RichFlatMapFunction) if true
    +  // generate Function(e.g. FlatMapFunction) if false
    +  private var generatedRichFunction = false
    --- End diff --
    
    This increases the complexity of the code generator. Why don't we generate RichFunctions by default? Every RichFunction is a valid Function anyway.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3176: [FLINK-5571] [table] add open and close methods fo...

Posted by godfreyhe <gi...@git.apache.org>.
Github user godfreyhe commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3176#discussion_r99544629
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala ---
    @@ -24,4 +24,19 @@ package org.apache.flink.table.functions
       * User-defined functions must have a default constructor and must be instantiable during runtime.
       */
     trait UserDefinedFunction {
    --- End diff --
    
    I test successfully in Java without `open` and `close` methods.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3176: [FLINK-5571] [table] add open and close methods fo...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3176#discussion_r101247688
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -220,56 +247,105 @@ class CodeGenerator(
         // manual casting here
         val samHeader =
           // FlatMapFunction
    -      if (clazz == classOf[FlatMapFunction[_,_]]) {
    +      if (clazz == classOf[FlatMapFunction[_, _]]) {
    --- End diff --
    
    As I mentioned above, why not keeping it simple and always generate RichFunctions? The `clazz` parameter is still correct as `RichFlatMapFunction` implements `FlatMapFunction`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3176: [FLINK-5571] [table] add open and close methods fo...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3176#discussion_r101248702
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UDFContext.scala ---
    @@ -0,0 +1,53 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.functions
    +
    +import java.io.File
    +
    +import org.apache.flink.annotation.PublicEvolving
    +import org.apache.flink.api.common.functions.RuntimeContext
    +import org.apache.flink.metrics.MetricGroup
    +
    +class UDFContext(context: RuntimeContext) {
    --- End diff --
    
    Please also add a Javadoc here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3176: [FLINK-5571] [table] add open and close methods fo...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3176#discussion_r99568239
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala ---
    @@ -24,4 +24,19 @@ package org.apache.flink.table.functions
       * User-defined functions must have a default constructor and must be instantiable during runtime.
       */
     trait UserDefinedFunction {
    --- End diff --
    
    Did you create a Java class which extends `ScalarFunction` or `TableFunction` ? I tried this in my environment but the compiling is failed.
    
    ```java
    public class MyScalarFunction extends ScalarFunction {
    	public int eval(int a) {
    		return a + 1;
    	}
    }
    ```
    
    ```
    Error:(25, 8) java: org.apache.flink.table.examples.java.MyScalarFunction\u4e0d\u662f\u62bd\u8c61\u7684,
     \u5e76\u4e14\u672a\u8986\u76d6org.apache.flink.table.functions.UserDefinedFunction\u4e2d\u7684\u62bd\u8c61\u65b9\u6cd5close()
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3176: [FLINK-5571] [table] add open and close methods fo...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3176#discussion_r97036095
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -220,56 +247,105 @@ class CodeGenerator(
         // manual casting here
         val samHeader =
           // FlatMapFunction
    -      if (clazz == classOf[FlatMapFunction[_,_]]) {
    +      if (clazz == classOf[FlatMapFunction[_, _]]) {
    --- End diff --
    
    I would like to determine the baseClass by the `clazz` parameter not by the `generatedRichFunctions` boolean flag.  Users call the `generateFunction(desc, classOf[FlatMapFunction[Any, Any]], body, returnType)` but returned a RichFlatMapFunction, which is wired and uncontrollable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3176: [FLINK-5571] [table] add open and close methods fo...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3176#discussion_r101265531
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala ---
    @@ -24,4 +24,19 @@ package org.apache.flink.table.functions
       * User-defined functions must have a default constructor and must be instantiable during runtime.
       */
     trait UserDefinedFunction {
    --- End diff --
    
    Did you run your code in Java 7 or Java 8? Java 8 introduces default implementation for interfaces, but Java 7 could cause problems. I think it won't hurt if we convert it into an `abstract class`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3176: [FLINK-5571] [table] add open and close methods fo...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3176#discussion_r101269209
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala ---
    @@ -43,10 +44,16 @@ class FlatMapRunner[IN, OUT](
         val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
         LOG.debug("Instantiating FlatMapFunction.")
         function = clazz.newInstance()
    +    FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
    --- End diff --
    
    Don't we have to add this to the `CorrelateFlatMapRunner` as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3176: [FLINK-5571] [table] add open and close methods fo...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3176#discussion_r100483661
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala ---
    @@ -24,4 +24,19 @@ package org.apache.flink.table.functions
       * User-defined functions must have a default constructor and must be instantiable during runtime.
       */
     trait UserDefinedFunction {
    --- End diff --
    
    I tried your code and my code again and it works good. Do not know why not work before...
    
    Thank you for your explain. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3176: [FLINK-5571] [table] add open and close methods fo...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3176#discussion_r101249809
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UDFContext.scala ---
    @@ -0,0 +1,53 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.functions
    +
    +import java.io.File
    +
    +import org.apache.flink.annotation.PublicEvolving
    +import org.apache.flink.api.common.functions.RuntimeContext
    +import org.apache.flink.metrics.MetricGroup
    +
    +class UDFContext(context: RuntimeContext) {
    +
    +  /**
    +    * Returns the metric group for this parallel subtask.
    +    */
    +  @PublicEvolving
    --- End diff --
    
    The Table API has no annotations yet as the API can still change. Can you remove all annotations here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3176: [FLINK-5571] [table] add open and close methods for UserD...

Posted by godfreyhe <gi...@git.apache.org>.
Github user godfreyhe commented on the issue:

    https://github.com/apache/flink/pull/3176
  
    Thanks for the suggestions @twalthr, I have updated the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3176: [FLINK-5571] [table] add open and close methods fo...

Posted by godfreyhe <gi...@git.apache.org>.
Github user godfreyhe commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3176#discussion_r100478286
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala ---
    @@ -24,4 +24,19 @@ package org.apache.flink.table.functions
       * User-defined functions must have a default constructor and must be instantiable during runtime.
       */
     trait UserDefinedFunction {
    --- End diff --
    
    The Scala wrapper class can be abstract class. 
    
    MathTrait.scala
    ```Scala
    trait MathTrait {
        def sum(x: Int, y: Int) = x + y
    }
    ```
    
    MathTraitWrapper.scala
    ```Scala
    abstract class MathTraitWrapper extends MathTrait {
    }
    ```
    
    JavaMath.java
    ```Java
    public class JavaMath extends MathTraitWrapper {
    	public static void main(String[] args) {
    		new JavaMath();
    	}
    
    	public JavaMath() {
    		System.out.println(sum(2, 2));
    	}
    }
    ```
    
    This code works as expected, printing the number 4 when it is run.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3176: [FLINK-5571] [table] add open and close methods fo...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3176#discussion_r101265088
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UDFContext.scala ---
    @@ -0,0 +1,53 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.functions
    +
    +import java.io.File
    +
    +import org.apache.flink.annotation.PublicEvolving
    +import org.apache.flink.api.common.functions.RuntimeContext
    +import org.apache.flink.metrics.MetricGroup
    +
    +class UDFContext(context: RuntimeContext) {
    +
    +  /**
    +    * Returns the metric group for this parallel subtask.
    +    */
    +  @PublicEvolving
    +  def getMetricGroup: MetricGroup = context.getMetricGroup
    +
    +  /**
    +    * Get the local temporary file copies of distributed cache files
    --- End diff --
    
    Please also add Javadoc for parameters and return values. This user-facing API and should be well documented. Please also add some website documentation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3176: [FLINK-5571] [table] add open and close methods fo...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3176#discussion_r97037393
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UDFContext.scala ---
    @@ -0,0 +1,52 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.functions
    +
    +import java.io.File
    +
    +import org.apache.flink.annotation.PublicEvolving
    +import org.apache.flink.api.common.functions.RuntimeContext
    +import org.apache.flink.metrics.MetricGroup
    +
    +class UDFContext(context: RuntimeContext) {
    +
    +  /**
    +    * Returns the metric group for this parallel subtask.
    +    */
    +  @PublicEvolving
    +  def getMetricGroup: MetricGroup = context.getMetricGroup
    +
    +  /**
    +    * Get the local temporary file copies of distributed cache files
    +    */
    +  def getDistributedCacheFile(name: String): File = context.getDistributedCache.getFile(name)
    +
    +  /**
    +    * Get the global job parameter
    +    * which is set by ExecutionEnvironment.getConfig.setGlobalJobParameters()
    +    */
    +  @PublicEvolving
    +  def getJobParameter(key: String, default: String): String = {
    +    context.getExecutionConfig.getGlobalJobParameters match {
    --- End diff --
    
    I think a simple if else is better than Scala pattern match here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3176: [FLINK-5571] [table] add open and close methods fo...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3176#discussion_r101447660
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala ---
    @@ -24,4 +24,19 @@ package org.apache.flink.table.functions
       * User-defined functions must have a default constructor and must be instantiable during runtime.
       */
     trait UserDefinedFunction {
    --- End diff --
    
    I tried in Java 7, it works.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3176: [FLINK-5571] [table] add open and close methods fo...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3176#discussion_r99566482
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -220,56 +247,105 @@ class CodeGenerator(
         // manual casting here
         val samHeader =
           // FlatMapFunction
    -      if (clazz == classOf[FlatMapFunction[_,_]]) {
    +      if (clazz == classOf[FlatMapFunction[_, _]]) {
    --- End diff --
    
    Make sense to me. I missed the case of `ScalarFunction` before.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3176: [FLINK-5571] [table] add open and close methods fo...

Posted by godfreyhe <gi...@git.apache.org>.
Github user godfreyhe commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3176#discussion_r99548048
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/UserDefinedTableFunctionITCase.scala ---
    @@ -0,0 +1,106 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.api.scala.batch.sql
    +
    +import org.apache.flink.api.scala.util.CollectionDataSets
    +import org.apache.flink.api.scala.{ExecutionEnvironment, _}
    +import org.apache.flink.table.api.TableEnvironment
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.table.api.scala.batch.utils.UDFTestUtils
    +import org.apache.flink.table.utils.{RichTableFunc0, RichTableFunc1}
    +import org.apache.flink.test.util.TestBaseUtils
    +import org.apache.flink.types.Row
    +import org.junit.Test
    +
    +import scala.collection.JavaConverters._
    +
    +class UserDefinedTableFunctionITCase {
    +
    +  @Test
    +  def testOpenClose(): Unit = {
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    tEnv.registerFunction("RichTableFunc0", new RichTableFunc0)
    +
    +    val sqlQuery = "SELECT a, s FROM t1, LATERAL TABLE(RichTableFunc0(c)) as T(s)"
    +
    +    val ds = CollectionDataSets.get3TupleDataSet(env)
    +    tEnv.registerDataSet("t1", ds, 'a, 'b, 'c)
    +
    +    val result = tEnv.sql(sqlQuery)
    +
    +    val expected =
    +      "1,Hi\n2,Hello\n3,Hello world\n4,Hello world, how are you?\n5,I am fine.\n6,Luke Skywalker"
    +    val results = result.toDataSet[Row].collect()
    +    TestBaseUtils.compareResultAsText(results.asJava, expected)
    +  }
    +
    +  @Test
    +  def testSingleUDTFWithParameter(): Unit = {
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    tEnv.registerFunction("RichTableFunc1", new RichTableFunc1)
    +    UDFTestUtils.setJobParameters(env, Map("word_separator" -> " "))
    +
    +    val sqlQuery = "SELECT a, s FROM t1, LATERAL TABLE(RichTableFunc1(c)) as T(s)"
    +
    +    val ds = CollectionDataSets.getSmall3TupleDataSet(env)
    +    tEnv.registerDataSet("t1", ds, 'a, 'b, 'c)
    +
    +    val result = tEnv.sql(sqlQuery)
    +
    +    val expected = "3,Hello\n3,world"
    +    val results = result.toDataSet[Row].collect()
    +    TestBaseUtils.compareResultAsText(results.asJava, expected)
    +  }
    +
    +  @Test
    +  def testMultiUDTFs(): Unit = {
    --- End diff --
    
    yes, each `RichTableFunction` will generate independent FlatMap function. And I think this test is also meaningful.  I will add cases to test UDTF with UDF later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3176: [FLINK-5571] [table] add open and close methods for UserD...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/3176
  
    Thanks for the update @godfreyhe. I will have a final pass through the code and merge it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3176: [FLINK-5571] [table] add open and close methods fo...

Posted by godfreyhe <gi...@git.apache.org>.
Github user godfreyhe commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3176#discussion_r101437890
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UDFContext.scala ---
    @@ -0,0 +1,53 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.functions
    +
    +import java.io.File
    +
    +import org.apache.flink.annotation.PublicEvolving
    +import org.apache.flink.api.common.functions.RuntimeContext
    +import org.apache.flink.metrics.MetricGroup
    +
    +class UDFContext(context: RuntimeContext) {
    +
    +  /**
    +    * Returns the metric group for this parallel subtask.
    +    */
    +  @PublicEvolving
    --- End diff --
    
    OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3176: [FLINK-5571] [table] add open and close methods fo...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3176#discussion_r97043055
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/UserDefinedTableFunctionITCase.scala ---
    @@ -0,0 +1,106 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.api.scala.batch.sql
    +
    +import org.apache.flink.api.scala.util.CollectionDataSets
    +import org.apache.flink.api.scala.{ExecutionEnvironment, _}
    +import org.apache.flink.table.api.TableEnvironment
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.table.api.scala.batch.utils.UDFTestUtils
    +import org.apache.flink.table.utils.{RichTableFunc0, RichTableFunc1}
    +import org.apache.flink.test.util.TestBaseUtils
    +import org.apache.flink.types.Row
    +import org.junit.Test
    +
    +import scala.collection.JavaConverters._
    +
    +class UserDefinedTableFunctionITCase {
    +
    +  @Test
    +  def testOpenClose(): Unit = {
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    tEnv.registerFunction("RichTableFunc0", new RichTableFunc0)
    +
    +    val sqlQuery = "SELECT a, s FROM t1, LATERAL TABLE(RichTableFunc0(c)) as T(s)"
    +
    +    val ds = CollectionDataSets.get3TupleDataSet(env)
    +    tEnv.registerDataSet("t1", ds, 'a, 'b, 'c)
    +
    +    val result = tEnv.sql(sqlQuery)
    +
    +    val expected =
    +      "1,Hi\n2,Hello\n3,Hello world\n4,Hello world, how are you?\n5,I am fine.\n6,Luke Skywalker"
    +    val results = result.toDataSet[Row].collect()
    +    TestBaseUtils.compareResultAsText(results.asJava, expected)
    +  }
    +
    +  @Test
    +  def testSingleUDTFWithParameter(): Unit = {
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    tEnv.registerFunction("RichTableFunc1", new RichTableFunc1)
    +    UDFTestUtils.setJobParameters(env, Map("word_separator" -> " "))
    +
    +    val sqlQuery = "SELECT a, s FROM t1, LATERAL TABLE(RichTableFunc1(c)) as T(s)"
    +
    +    val ds = CollectionDataSets.getSmall3TupleDataSet(env)
    +    tEnv.registerDataSet("t1", ds, 'a, 'b, 'c)
    +
    +    val result = tEnv.sql(sqlQuery)
    +
    +    val expected = "3,Hello\n3,world"
    +    val results = result.toDataSet[Row].collect()
    +    TestBaseUtils.compareResultAsText(results.asJava, expected)
    +  }
    +
    +  @Test
    +  def testMultiUDTFs(): Unit = {
    --- End diff --
    
    Actually, this can't test multiple RichTableFunction in one generated function. The RichTableFunc0 will join with t1 first, and then RichTableFunc1 will join with the result of previous.
    
    I will suggest to test UDTF with UDF , such as `RIchTableFunc0(RichScalarFunc0(c))`, to test multiple RichUserDefinedFunction in one generated FlatMapFunction.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3176: [FLINK-5571] [table] add open and close methods fo...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3176#discussion_r97038745
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala ---
    @@ -24,4 +24,19 @@ package org.apache.flink.table.functions
       * User-defined functions must have a default constructor and must be instantiable during runtime.
       */
     trait UserDefinedFunction {
    --- End diff --
    
    The `UserDefinedFunction` should be declared as `abstract class` now. Because trait is recognized as interface (without default implementation) in Java, so Java users have to implement `open()` and `close()` if `UserDefinedFunction` is a trait.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3176: [FLINK-5571] [table] add open and close methods fo...

Posted by godfreyhe <gi...@git.apache.org>.
Github user godfreyhe commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3176#discussion_r101428511
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala ---
    @@ -24,4 +24,19 @@ package org.apache.flink.table.functions
       * User-defined functions must have a default constructor and must be instantiable during runtime.
       */
     trait UserDefinedFunction {
    --- End diff --
    
    It works in Java 7


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3176: [FLINK-5571] [table] add open and close methods fo...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3176#discussion_r97033852
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -220,56 +247,105 @@ class CodeGenerator(
         // manual casting here
         val samHeader =
           // FlatMapFunction
    -      if (clazz == classOf[FlatMapFunction[_,_]]) {
    +      if (clazz == classOf[FlatMapFunction[_, _]]) {
    +        val baseClass = if (generatedRichFunctions) {
    +          classOf[RichFlatMapFunction[_, _]]
    +        } else {
    +          classOf[FlatMapFunction[_, _]]
    +        }
             val inputTypeTerm = boxedTypeTermForTypeInfo(input1)
    -        (s"void flatMap(Object _in1, org.apache.flink.util.Collector $collectorTerm)",
    +        (baseClass,
    +          s"void flatMap(Object _in1, org.apache.flink.util.Collector $collectorTerm)",
               List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;"))
           }
     
           // MapFunction
    -      else if (clazz == classOf[MapFunction[_,_]]) {
    +      else if (clazz == classOf[MapFunction[_, _]]) {
    +        val baseClass = if (generatedRichFunctions) {
    +          classOf[RichMapFunction[_, _]]
    +        } else {
    +          classOf[MapFunction[_, _]]
    +        }
             val inputTypeTerm = boxedTypeTermForTypeInfo(input1)
    -        ("Object map(Object _in1)",
    +        (baseClass,
    +          "Object map(Object _in1)",
               List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;"))
           }
     
           // FlatJoinFunction
    -      else if (clazz == classOf[FlatJoinFunction[_,_,_]]) {
    +      else if (clazz == classOf[FlatJoinFunction[_, _, _]]) {
    +        val baseClass = if (generatedRichFunctions) {
    +          classOf[RichFlatJoinFunction[_, _, _]]
    +        } else {
    +          classOf[FlatJoinFunction[_, _, _]]
    +        }
             val inputTypeTerm1 = boxedTypeTermForTypeInfo(input1)
             val inputTypeTerm2 = boxedTypeTermForTypeInfo(input2.getOrElse(
    -            throw new CodeGenException("Input 2 for FlatJoinFunction should not be null")))
    -        (s"void join(Object _in1, Object _in2, org.apache.flink.util.Collector $collectorTerm)",
    +          throw new CodeGenException("Input 2 for FlatJoinFunction should not be null")))
    +        (baseClass,
    +          s"void join(Object _in1, Object _in2, org.apache.flink.util.Collector $collectorTerm)",
               List(s"$inputTypeTerm1 $input1Term = ($inputTypeTerm1) _in1;",
    -          s"$inputTypeTerm2 $input2Term = ($inputTypeTerm2) _in2;"))
    +               s"$inputTypeTerm2 $input2Term = ($inputTypeTerm2) _in2;"))
           }
           else {
             // TODO more functions
             throw new CodeGenException("Unsupported Function.")
           }
     
    -    val funcCode = j"""
    -      public class $funcName
    -          implements ${clazz.getCanonicalName} {
    +    val funcCode = if (generatedRichFunctions) {
    --- End diff --
    
    There is a lot of duplicate code between RichFunction codegen and non-RichFunction codegen. The only difference between them is the open close method code, so I think it would be better to **insert** open close code when it is a RichFunction.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3176: [FLINK-5571] [table] add open and close methods for UserD...

Posted by godfreyhe <gi...@git.apache.org>.
Github user godfreyhe commented on the issue:

    https://github.com/apache/flink/pull/3176
  
    Hi @wuchong , thanks for this review. 
    
    I think `UserDefinedScalarFunctionTest` is not enough, because `open` , `close` methods and job parameters are strongly depend on runtime, so it's better to test them in integration tests.
    
    I will add a test to verify Java ScalarFunction later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3176: [FLINK-5571] [table] add open and close methods fo...

Posted by godfreyhe <gi...@git.apache.org>.
Github user godfreyhe commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3176#discussion_r101437939
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -122,6 +123,18 @@ class CodeGenerator(
       // we use a LinkedHashSet to keep the insertion order
       private val reusableInitStatements = mutable.LinkedHashSet[String]()
     
    +  // generate RichFunction(e.g. RichFlatMapFunction) if true
    +  // generate Function(e.g. FlatMapFunction) if false
    +  private var generatedRichFunction = false
    --- End diff --
    
    OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3176: [FLINK-5571] [table] add open and close methods fo...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/3176


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---