You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by Xpray <gi...@git.apache.org> on 2017/04/27 15:32:18 UTC

[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

GitHub user Xpray opened a pull request:

    https://github.com/apache/flink/pull/3791

    [FLINK-6334] [table] Refactoring UDTF interface

    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [x] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [ ] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [x] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/Xpray/flink f6334-ready

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3791.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3791
    
----
commit 588781f3267d77c3a8ef80f971c6741fd3aa9a00
Author: Xpray <le...@gmail.com>
Date:   2017-04-27T15:18:19Z

    [FLINK-6334] [table] Refactoring UDTF interface

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3791#discussion_r114188108
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UserDefinedTableFunctionTest.scala ---
    @@ -50,72 +50,85 @@ class UserDefinedTableFunctionTest extends TableTestBase {
     
         // Java environment
         val javaEnv = mock(classOf[JavaExecutionEnv])
    -    val javaTableEnv = TableEnvironment.getTableEnvironment(javaEnv)
    -    val in2 = javaTableEnv.fromDataStream(jDs).as("a, b, c")
    +    val jtEnv = TableEnvironment.getTableEnvironment(javaEnv)
    +    val in2 = jtEnv.fromDataStream(jDs).as("a, b, c")
     
         // test cross join
         val func1 = new TableFunc1
    -    javaTableEnv.registerFunction("func1", func1)
    +    jtEnv.registerFunction("func1", func1)
         var scalaTable = in1.join(func1('c) as 's).select('c, 's)
    -    var javaTable = in2.join("func1(c).as(s)").select("c, s")
    +    var javaTable = in2.join(jtEnv.tableApply("func1(c)")
    +      .as("s")).select("c, s")
         verifyTableEquals(scalaTable, javaTable)
     
         // test left outer join
         scalaTable = in1.leftOuterJoin(func1('c) as 's).select('c, 's)
    -    javaTable = in2.leftOuterJoin("as(func1(c), s)").select("c, s")
    +    javaTable = in2.leftOuterJoin(
    +      jtEnv.tableApply("func1(c)")
    +        .as("s")
    +    ).select("c, s")
         verifyTableEquals(scalaTable, javaTable)
     
         // test overloading
         scalaTable = in1.join(func1('c, "$") as 's).select('c, 's)
    -    javaTable = in2.join("func1(c, '$') as (s)").select("c, s")
    +    javaTable = in2.join(jtEnv.tableApply("func1(c, '$')")
    +      .as("s")).select("c, s")
         verifyTableEquals(scalaTable, javaTable)
     
         // test custom result type
         val func2 = new TableFunc2
    -    javaTableEnv.registerFunction("func2", func2)
    +    jtEnv.registerFunction("func2", func2)
         scalaTable = in1.join(func2('c) as ('name, 'len)).select('c, 'name, 'len)
    -    javaTable = in2.join("func2(c).as(name, len)").select("c, name, len")
    +    javaTable = in2.join(
    +      jtEnv.tableApply("func2(c)")
    +        .as("name, len"))
    +      .select("c, name, len")
         verifyTableEquals(scalaTable, javaTable)
     
         // test hierarchy generic type
         val hierarchy = new HierarchyTableFunction
    -    javaTableEnv.registerFunction("hierarchy", hierarchy)
    -    scalaTable = in1.join(hierarchy('c) as ('name, 'adult, 'len))
    +    jtEnv.registerFunction("hierarchy", hierarchy)
    +    scalaTable = in1.join(hierarchy('c) as ('name, 'len, 'adult))
           .select('c, 'name, 'len, 'adult)
    -    javaTable = in2.join("AS(hierarchy(c), name, adult, len)")
    +    javaTable = in2.join(jtEnv.tableApply("hierarchy(c)")
    +      .as("name, len, adult"))
           .select("c, name, len, adult")
         verifyTableEquals(scalaTable, javaTable)
     
         // test pojo type
         val pojo = new PojoTableFunc
    -    javaTableEnv.registerFunction("pojo", pojo)
    +    jtEnv.registerFunction("pojo", pojo)
         scalaTable = in1.join(pojo('c))
           .select('c, 'name, 'age)
    -    javaTable = in2.join("pojo(c)")
    +    javaTable = in2.join(jtEnv.tableApply("pojo(c)"))
           .select("c, name, age")
         verifyTableEquals(scalaTable, javaTable)
     
         // test with filter
         scalaTable = in1.join(func2('c) as ('name, 'len))
           .select('c, 'name, 'len).filter('len > 2)
    -    javaTable = in2.join("func2(c) as (name, len)")
    +    javaTable = in2.join(jtEnv.tableApply("func2(c)") as ("name, len"))
           .select("c, name, len").filter("len > 2")
         verifyTableEquals(scalaTable, javaTable)
     
         // test with scalar function
         scalaTable = in1.join(func1('c.substring(2)) as 's)
           .select('a, 'c, 's)
    -    javaTable = in2.join("func1(substring(c, 2)) as (s)")
    +    javaTable = in2.join(jtEnv.tableApply("func1(substring(c, 2))") as ("s"))
           .select("a, c, s")
         verifyTableEquals(scalaTable, javaTable)
     
         // check scala object is forbidden
         expectExceptionThrown(
           tableEnv.registerFunction("func3", ObjectTableFunction), "Scala object")
         expectExceptionThrown(
    -      javaTableEnv.registerFunction("func3", ObjectTableFunction), "Scala object")
    +      jtEnv.registerFunction("func3", ObjectTableFunction), "Scala object")
         expectExceptionThrown(
    -      in1.join(ObjectTableFunction('a, 1)), "Scala object")
    +      {
    --- End diff --
    
    Please avoid unnecessary refactorings. They make PRs harder to review and might be reverted by the next person going over this code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3791#discussion_r114061878
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ---
    @@ -309,6 +308,42 @@ class Table(
       }
     
       /**
    +    * Joins this [[Table]] to a user-defined [[org.apache.calcite.schema.TableFunction]]. Similar
    +    * to an SQL left outer join with ON TRUE, but it works with a table function. It returns all
    --- End diff --
    
    I think the description of the outer join is confusing. 
    In the sentence "... all the rows from the outer table (table on the left of the operator), and rows that do not match the condition from the table function" 
    1. it should be "that do match the condition"
    2. there is no condition
    3. it reads as the rows of the left side and table function would be unioned.
    
    I think we can assume that the user knows how a left join works. We should simply say that the each row of the outer table is joined with each row produced by the table function call and that the outer row is padded with nulls if the table function returns an empty table.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3791#discussion_r114558126
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala ---
    @@ -358,4 +360,45 @@ object UserDefinedFunctionUtils {
         InstantiationUtil
           .deserializeObject[UserDefinedFunction](byteData, Thread.currentThread.getContextClassLoader)
       }
    +
    +  /**
    +    * this method is used for create a [[LogicalTableFunctionCall]]
    +    * @param tableEnv
    +    * @param udtf a String represent a TableFunction Call e.g "split(c)"
    +    * @return
    +    */
    +  def createLogicalFunctionCall(tableEnv: TableEnvironment, udtf: String) = {
    --- End diff --
    
    Add return type to method


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3791#discussion_r113833504
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---
    @@ -107,6 +107,11 @@ abstract class TableEnvironment(val config: TableConfig) {
       // registered external catalog names -> catalog
       private val externalCatalogs = new HashMap[String, ExternalCatalog]
     
    +  private lazy val tableFunctionParser = new TableFunctionParser(this)
    +
    +  // the method for converting a udtf String to Table for Java API
    +  final def tableApply(udtf: String): Table = tableFunctionParser(udtf)
    --- End diff --
    
    `TableFunctionParser` only has one method named `apply`. IMO. It's a util method. So here are 3 suggestions:
    * If a class only contains util methods, I suggest change `class` to `object`, And tableEnv can be a param of method.
    * If `TableFunctionParser#apply` only used for `TableFunction`, I suggest move `apply` method into `UserDefinedFunctionUtils` ,Because all of the functional methods of `UDF/UDTF/UDAF` in that file.
    * if the method only used for `TableEnvironment`.Whether it can be implemented internally in `TableEnvironment` \uff1f
    What do you think? @Xpray 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/3791


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3791#discussion_r114553215
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---
    @@ -49,11 +49,11 @@ import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTabl
     import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem}
     import org.apache.flink.table.catalog.{ExternalCatalog, ExternalCatalogSchema}
     import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
    -import org.apache.flink.table.expressions.{Alias, Expression, UnresolvedFieldReference}
    +import org.apache.flink.table.expressions.{Alias, Call, Expression, ExpressionParser, TableFunctionCall, UnresolvedFieldReference}
    --- End diff --
    
    Remove the unnecessary imports


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3791#discussion_r114204295
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala ---
    @@ -0,0 +1,56 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.api.scala
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.table.api.Table
    +import org.apache.flink.table.expressions._
    +import org.apache.flink.table.functions.TableFunction
    +import org.apache.flink.table.plan.logical.LogicalTableFunctionCall
    +
    +/**
    +  * Holds methods to convert a [[TableFunction]] (provided by scala user) into a [[Table]]
    +  *
    +  * @param tf The tableFunction to convert.
    +  */
    +class TableFunctionConversions[T](tf: TableFunction[T]) {
    +
    --- End diff --
    
    I think the name `TableFunctionConversions` is OK. A `TableFunction` object is implicitly converted into a `TableFunctionConversions` and the `apply` method converts it into a `Table`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3791#discussion_r114198491
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ---
    @@ -417,12 +452,33 @@ class Table(
     
       private def join(right: Table, joinPredicate: Option[Expression], joinType: JoinType): Table = {
         // check that right table belongs to the same TableEnvironment
    -    if (right.tableEnv != this.tableEnv) {
    +    if (right.tableEnv != null && right.tableEnv != this.tableEnv) {
           throw new ValidationException("Only tables from the same TableEnvironment can be joined.")
         }
    +
    +    val rule: PartialFunction[LogicalNode, LogicalNode] = {
    +      case udtf: LogicalTableFunctionCall if udtf.child == null => {
    +        new LogicalTableFunctionCall(
    +          udtf.functionName,
    +          udtf.tableFunction,
    +          udtf.parameters,
    +          udtf.resultType,
    +          udtf.fieldNames,
    +          this.logicalPlan
    +        ).validate(tableEnv)
    +      }
    +      case other: LogicalNode => other.validate(tableEnv)
    +    }
    +
    +    val newRightPlan = right.logicalPlan.postOrderTransform(rule)
    --- End diff --
    
    Before this was done in by unwrapping the TableFunctionCall and adding the alias. This step also ensured, that there are no other operations except Alias and the function call. This logic is still used in the TableFunctionParser. I think we should do similar steps here.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3791#discussion_r114183388
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala ---
    @@ -59,10 +59,14 @@ class DataStreamCorrelateRule
             case rel: RelSubset =>
               convertToCorrelate(rel.getRelList.get(0), condition)
     
    -        case calc: FlinkLogicalCalc =>
    +        case calc: FlinkLogicalCalc => {
    --- End diff --
    
    We have to add a check that the Calc only filters but does not modify the input attributes (besides renaming fields). Otherwise, we might lose the projection information (e.g., if one of the table functions attributes is changed by an expression).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3791#discussion_r113839137
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala ---
    @@ -20,8 +20,8 @@ package org.apache.flink.table.api.java
     import org.apache.flink.api.common.typeinfo.TypeInformation
     import org.apache.flink.api.java.typeutils.TypeExtractor
     import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
    -import org.apache.flink.table.expressions.ExpressionParser
     import org.apache.flink.table.api._
    +import org.apache.flink.table.expressions.ExpressionParser
    --- End diff --
    
    Please remove this line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3791#discussion_r114590134
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ---
    @@ -93,6 +103,11 @@ class Table(
         * }}}
         */
       def select(fields: Expression*): Table = {
    +    if (UserDefinedFunctionUtils.verifyTableFunctionCallExistence(this)) {
    --- End diff --
    
    I think we can add these checks without touching all methods of `Table`.
    We could implement a method that recursively traverses a `LogicalNode` and checks if one of this children is an unbounded table function call. This check is performed in the constructor of Table and throws an exception except, the `logicalNode` itself is a `LogicalTableFunctionCall` (this is the case if it was created with the new  constructor or `as()` was applied on it.
    
    That way we can remove all checks in the methods.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3791#discussion_r114062529
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ---
    @@ -417,12 +452,33 @@ class Table(
     
       private def join(right: Table, joinPredicate: Option[Expression], joinType: JoinType): Table = {
         // check that right table belongs to the same TableEnvironment
    -    if (right.tableEnv != this.tableEnv) {
    +    if (right.tableEnv != null && right.tableEnv != this.tableEnv) {
    --- End diff --
    
    Can we add a check that the root of `right.logicalPlan` is a `LogicalTableFunctionCall` if `right.tableEnv == null`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3791#discussion_r114590580
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ---
    @@ -417,13 +507,45 @@ class Table(
       }
     
       private def join(right: Table, joinPredicate: Option[Expression], joinType: JoinType): Table = {
    -    // check that right table belongs to the same TableEnvironment
    -    if (right.tableEnv != this.tableEnv) {
    +    if (UserDefinedFunctionUtils.verifyTableFunctionCallExistence(this)) {
    +      throw new ValidationException(
    +        "TableFunctions can only be followed by Alias. e.g table.join(split('c) as ('a, 'b))")
    +    }
    +
    +    // check that the TableEnvironment of right table is not null
    +    // and right table belongs to the same TableEnvironment
    +    if (right.tableEnv != null && right.tableEnv != this.tableEnv) {
    --- End diff --
    
    I would change this method as follows:
    first we check if `right` is a table function call. If no, we translate the join as before. Otherwise, we translate it as a join with a table function.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3791#discussion_r114198865
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala ---
    @@ -90,4 +91,7 @@ package object scala extends ImplicitExpressionConversions {
         tableEnv.toDataStream[Row](table)
       }
     
    +  implicit def tableFunctionCall2Table[T](tf: TableFunction[T]): TableFunctionConversions[T] = {
    --- End diff --
    
    We should also add tests to validate that these cases are properly handled.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3791#discussion_r114190116
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UserDefinedTableFunctionTest.scala ---
    @@ -174,7 +209,8 @@ class UserDefinedTableFunctionTest extends TableTestBase {
       def testCrossJoin(): Unit = {
         val util = streamTestUtil()
         val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
    -    val function = util.addFunction("func1", new TableFunc1)
    +    val function = new TableFunc1
    --- End diff --
    
    Please revert these (and similar) unnecessary changes. Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3791#discussion_r114183462
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetCorrelateRule.scala ---
    @@ -60,10 +60,14 @@ class DataSetCorrelateRule
               case rel: RelSubset =>
                 convertToCorrelate(rel.getRelList.get(0), condition)
     
    -          case calc: FlinkLogicalCalc =>
    +          case calc: FlinkLogicalCalc => {
    --- End diff --
    
    We have to add a check that the Calc only filters but does not modify the input attributes (besides renaming fields). Otherwise, we might lose the projection information (e.g., if one of the table functions attributes is changed by an expression).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3791#discussion_r114197458
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala ---
    @@ -90,4 +91,7 @@ package object scala extends ImplicitExpressionConversions {
         tableEnv.toDataStream[Row](table)
       }
     
    +  implicit def tableFunctionCall2Table[T](tf: TableFunction[T]): TableFunctionConversions[T] = {
    --- End diff --
    
    This allows to use table functions everywhere where a Table is expected, e.g., in `union`, `intersect`, `minus`, `rightOuterJoin` and `fullOuterJoin`. We should check these cases and throw a good exception explaining that TableFunctions are currently only supported for leftJoin and join.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3791#discussion_r113858925
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala ---
    @@ -0,0 +1,56 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.api.scala
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.table.api.Table
    +import org.apache.flink.table.expressions._
    +import org.apache.flink.table.functions.TableFunction
    +import org.apache.flink.table.plan.logical.LogicalTableFunctionCall
    +
    +/**
    +  * Holds methods to convert a [[TableFunction]] (provided by scala user) into a [[Table]]
    +  *
    +  * @param tf The tableFunction to convert.
    +  */
    +class TableFunctionConversions[T](tf: TableFunction[T]) {
    +
    --- End diff --
    
    In fact, I think it's a `Type` not a `Conversion`. I'am not sure. I think @fhueske can give us bestest suggestion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3791#discussion_r114063055
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalNode.scala ---
    @@ -79,15 +79,21 @@ abstract class LogicalNode extends TreeNode[LogicalNode] {
       protected[logical] def construct(relBuilder: RelBuilder): RelBuilder
     
       def validate(tableEnv: TableEnvironment): LogicalNode = {
    -    val resolvedNode = resolveExpressions(tableEnv)
    -    resolvedNode.expressionPostOrderTransform {
    -      case a: Attribute if !a.valid =>
    -        val from = children.flatMap(_.output).map(_.name).mkString(", ")
    -        failValidation(s"Cannot resolve [${a.name}] given input [$from].")
    -
    -      case e: Expression if e.validateInput().isFailure =>
    -        failValidation(s"Expression $e failed on input check: " +
    -          s"${e.validateInput().asInstanceOf[ValidationFailure].message}")
    +    // A tableFunction may not contain the tableEnv when created by scala user
    +    // We do not validate operators (select, as etc.)
    +    // if they are applied on such TableFunction with empty tableEnv.
    +    if (tableEnv == null) this
    --- End diff --
    
    This looks like a potentially problematic condition which relies on the fact that `tableEnv` may only be `null` in case of a `TableFunction`. It would be much better if we could avoid such special casing in method which are called from many different places.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3791#discussion_r114061983
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ---
    @@ -309,6 +308,42 @@ class Table(
       }
     
       /**
    +    * Joins this [[Table]] to a user-defined [[org.apache.calcite.schema.TableFunction]]. Similar
    +    * to an SQL left outer join with ON TRUE, but it works with a table function. It returns all
    +    * the rows from the outer table (table on the left of the operator), and rows that do not match
    +    * the condition from the table function (which is defined in the expression on the right
    +    * side of the operator). Rows with no matching condition are filled with null values.
    +    *
    +    * Scala Example:
    +    * {{{
    +    *   class MySplitUDTF extends TableFunction[String] {
    +    *     def eval(str: String): Unit = {
    +    *       str.split("#").foreach(collect)
    +    *     }
    +    *   }
    +    *
    +    *   val split = new MySplitUDTF()
    +    *   table.leftOuterJoin(split('c) as ('s)).select('a,'b,'c,'s)
    +    * }}}
    +    *
    +    * Java Example:
    +    * {{{
    +    *   class MySplitUDTF extends TableFunction<String> {
    +    *     public void eval(String str) {
    +    *       str.split("#").forEach(this::collect);
    +    *     }
    +    *   }
    +    *
    +    *   TableFunction<String> split = new MySplitUDTF();
    +    *   tableEnv.registerFunction("split", split);
    +    *   table.leftOuterJoin(tableEnv.tableApply("split(c)").as("s"))).select("a, b, c, s");
    --- End diff --
    
    I think the Java Syntax could be improved. Could we do something like:
    ```
    table.leftOuterJoin(new Table(tableEnv, "split(c) as s")).select("a, b, c, s");
    ```
    
    by adding an constructor to `Table` like `Table(tEnv: TableEnvironment, tableFunc: String)`?
    IMO, this would make it more clear that the TableFunction creates a table which is joined.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3791#discussion_r113833924
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableFunctionParser.scala ---
    @@ -0,0 +1,66 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.api
    +
    +import org.apache.flink.table.expressions._
    +import org.apache.flink.table.plan.logical.LogicalTableFunctionCall
    +
    +
    +/**
    +  * A parser to convert a udtf String (for Java user) to [[Table]]
    +  *
    +  * @param tableEnv a [[TableEnvironment]] which is used for looking up a function
    +  */
    +class TableFunctionParser(tableEnv: TableEnvironment) {
    +
    --- End diff --
    
    Only contains util method. I suggest change it to `Object`. or move the method into `UserDefinedFunctionUtils`. And `tableEnv` can be a param of method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3791#discussion_r114197865
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ---
    @@ -417,12 +452,33 @@ class Table(
     
       private def join(right: Table, joinPredicate: Option[Expression], joinType: JoinType): Table = {
         // check that right table belongs to the same TableEnvironment
    -    if (right.tableEnv != this.tableEnv) {
    +    if (right.tableEnv != null && right.tableEnv != this.tableEnv) {
    --- End diff --
    
    I think we need to better distinguish the regular table join cases from the table function join cases. 
    We can only join and leftOuterJoin with a table function. Also, the changes below are only required for joins with a table function.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

Posted by Xpray <gi...@git.apache.org>.
Github user Xpray commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3791#discussion_r113855406
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala ---
    @@ -0,0 +1,56 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.api.scala
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.table.api.Table
    +import org.apache.flink.table.expressions._
    +import org.apache.flink.table.functions.TableFunction
    +import org.apache.flink.table.plan.logical.LogicalTableFunctionCall
    +
    +/**
    +  * Holds methods to convert a [[TableFunction]] (provided by scala user) into a [[Table]]
    +  *
    +  * @param tf The tableFunction to convert.
    +  */
    +class TableFunctionConversions[T](tf: TableFunction[T]) {
    +
    --- End diff --
    
    I found that most of the existing implicit convert functions have common postfix like XXXConversions, to which I think this naming may be clear.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3791#discussion_r113834406
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala ---
    @@ -19,11 +19,12 @@ package org.apache.flink.table.api.java
     
     import org.apache.flink.api.common.typeinfo.TypeInformation
     import org.apache.flink.api.java.typeutils.TypeExtractor
    -import org.apache.flink.table.api._
    -import org.apache.flink.table.functions.TableFunction
    -import org.apache.flink.table.expressions.ExpressionParser
     import org.apache.flink.streaming.api.datastream.DataStream
     import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    +import org.apache.flink.table.api._
    +import org.apache.flink.table.api.scala.TableFunctionConversions
    --- End diff --
    
    Please remove the useless import.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3791#discussion_r114063647
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ---
    @@ -417,12 +452,33 @@ class Table(
     
       private def join(right: Table, joinPredicate: Option[Expression], joinType: JoinType): Table = {
         // check that right table belongs to the same TableEnvironment
    -    if (right.tableEnv != this.tableEnv) {
    +    if (right.tableEnv != null && right.tableEnv != this.tableEnv) {
           throw new ValidationException("Only tables from the same TableEnvironment can be joined.")
         }
    +
    +    val rule: PartialFunction[LogicalNode, LogicalNode] = {
    +      case udtf: LogicalTableFunctionCall if udtf.child == null => {
    +        new LogicalTableFunctionCall(
    +          udtf.functionName,
    +          udtf.tableFunction,
    +          udtf.parameters,
    +          udtf.resultType,
    +          udtf.fieldNames,
    +          this.logicalPlan
    +        ).validate(tableEnv)
    +      }
    +      case other: LogicalNode => other.validate(tableEnv)
    +    }
    +
    +    val newRightPlan = right.logicalPlan.postOrderTransform(rule)
    --- End diff --
    
    I think we have to make sure at this point that the right plan is only a `LogicalTableFunctionCall` and an `Alias`. We cannot translate anything else yet and should give a meaningful error message.
    Also, it would be good, if we could merge the `Alias` into the `LogicalTableFunctionCall` to get rid of many special cases in the validation process.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3791: [FLINK-6334] [table] Refactoring UDTF interface

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on the issue:

    https://github.com/apache/flink/pull/3791
  
    HI @Xpray Thanks for the update. I think the PR is in a pretty good shape for me. Except for the name of `TableFunctionConversions`.  looking forward to @fhueske 's opinion.
    
    Best,
    SunJincheng


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3791#discussion_r114062550
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ---
    @@ -417,12 +452,33 @@ class Table(
     
       private def join(right: Table, joinPredicate: Option[Expression], joinType: JoinType): Table = {
         // check that right table belongs to the same TableEnvironment
    -    if (right.tableEnv != this.tableEnv) {
    +    if (right.tableEnv != null && right.tableEnv != this.tableEnv) {
           throw new ValidationException("Only tables from the same TableEnvironment can be joined.")
         }
    +
    +    val rule: PartialFunction[LogicalNode, LogicalNode] = {
    +      case udtf: LogicalTableFunctionCall if udtf.child == null => {
    +        new LogicalTableFunctionCall(
    +          udtf.functionName,
    +          udtf.tableFunction,
    +          udtf.parameters,
    +          udtf.resultType,
    +          udtf.fieldNames,
    +          this.logicalPlan
    +        ).validate(tableEnv)
    +      }
    +      case other: LogicalNode => other.validate(tableEnv)
    +    }
    +
    +    val newRightPlan = right.logicalPlan.postOrderTransform(rule)
    +    /**
    +      * if right plan has an unresolved LogicalTableFunctionCall, correlated shall be true
    --- End diff --
    
    format comment into one line `// if right plan ...`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3791#discussion_r114556553
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala ---
    @@ -35,6 +36,10 @@ class TableConversions(table: Table) {
       /** Converts the [[Table]] to a [[DataSet]] of the specified type. */
       def toDataSet[T: TypeInformation]: DataSet[T] = {
     
    +    if (UserDefinedFunctionUtils.verifyTableFunctionCallExistence(table)) {
    --- End diff --
    
    We should put this check into `Table.getRelNode`. This will catch all cases when we try to translate a table.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3791#discussion_r113836005
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ---
    @@ -417,12 +452,33 @@ class Table(
     
       private def join(right: Table, joinPredicate: Option[Expression], joinType: JoinType): Table = {
         // check that right table belongs to the same TableEnvironment
    --- End diff --
    
    // check that  the TableEnvironment of right table is not null and right table belongs to the same TableEnvironment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3791#discussion_r114199929
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetCorrelateRule.scala ---
    @@ -60,10 +60,14 @@ class DataSetCorrelateRule
               case rel: RelSubset =>
                 convertToCorrelate(rel.getRelList.get(0), condition)
     
    -          case calc: FlinkLogicalCalc =>
    +          case calc: FlinkLogicalCalc => {
    --- End diff --
    
    We should (later?) add optimization rules to prevent that expressions are pushed below the join towards the table function (and are pushed above the join if they are already below the join). 
    For now we should just ensure that we do not compute incorrect results.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3791#discussion_r114063517
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala ---
    @@ -56,6 +57,8 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend
                 case gcf: GetCompositeField => Alias(gcf, gcf.aliasName().getOrElse(s"_c$i"))
                 case other => Alias(other, s"_c$i")
               }
    +          // This happens when TableFunction.apply returns a Table, ex: t1.join(split('a) as 'b)
    +          case alias: Alias => alias
    --- End diff --
    
    Can we merge `Alias` before into the `LogicalTableFunctionCall` (maybe in `table.join()`?
    We should add as few special cases in the validation as possible, IMO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3791#discussion_r113835656
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala ---
    @@ -0,0 +1,56 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.api.scala
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.table.api.Table
    +import org.apache.flink.table.expressions._
    +import org.apache.flink.table.functions.TableFunction
    +import org.apache.flink.table.plan.logical.LogicalTableFunctionCall
    +
    +/**
    +  * Holds methods to convert a [[TableFunction]] (provided by scala user) into a [[Table]]
    +  *
    +  * @param tf The tableFunction to convert.
    +  */
    +class TableFunctionConversions[T](tf: TableFunction[T]) {
    +
    --- End diff --
    
    I think before apply the  `TableFunction` ,It's just a definition. And when it's applied. It's a table. So I like named `AppliedTableFunction`. So we have two step a bout using `UDTF`. that is: define -> apply -> join.
    What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3791#discussion_r114063404
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ---
    @@ -417,12 +452,33 @@ class Table(
     
       private def join(right: Table, joinPredicate: Option[Expression], joinType: JoinType): Table = {
         // check that right table belongs to the same TableEnvironment
    -    if (right.tableEnv != this.tableEnv) {
    +    if (right.tableEnv != null && right.tableEnv != this.tableEnv) {
           throw new ValidationException("Only tables from the same TableEnvironment can be joined.")
         }
    +
    +    val rule: PartialFunction[LogicalNode, LogicalNode] = {
    +      case udtf: LogicalTableFunctionCall if udtf.child == null => {
    +        new LogicalTableFunctionCall(
    +          udtf.functionName,
    +          udtf.tableFunction,
    +          udtf.parameters,
    +          udtf.resultType,
    +          udtf.fieldNames,
    +          this.logicalPlan
    +        ).validate(tableEnv)
    +      }
    +      case other: LogicalNode => other.validate(tableEnv)
    --- End diff --
    
    Split rule into one rule to set the logicalPlan to the root and another rule to validate all nodes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3791: [FLINK-6334] [table] Refactoring UDTF interface

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3791#discussion_r113839148
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala ---
    @@ -20,8 +20,8 @@ package org.apache.flink.table.api.java
     import org.apache.flink.api.common.typeinfo.TypeInformation
     import org.apache.flink.api.java.typeutils.TypeExtractor
     import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
    -import org.apache.flink.table.expressions.ExpressionParser
    --- End diff --
    
    Please restore this line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---