You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by yjshen <gi...@git.apache.org> on 2016/04/20 16:42:41 UTC

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

GitHub user yjshen opened a pull request:

    https://github.com/apache/flink/pull/1916

    [FLINK-3754][Table]Add a validation phase before construct RelNode using TableAPI

    This PR aims at adding an extra phase of **validation** for plans generated from Table API, matches the functionality of Calcite's Validator that are called during we execute an query expressed in SQL String.
    
    In order to do this, I inserted a new layer between TableAPI and `RelNode` construction: The `Logical Plan`.
    
    And the main procedure of validation and RelNode construction work as follows:
    
    1. Constructing logical plan in flight
    2. When we are about to execute the plan:
    3. Do resolution using table/dataset/datastream schema
    4. Do validation on the type annotated logical plan tree
    5. Translate logical plan into `RelNode` once we successfully finished validation.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/yjshen/flink extra_validation

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1916.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1916
    
----
commit 7fb102af1fa52dab2f0c80785f75bf7e0d8a7062
Author: Yijie Shen <he...@gmail.com>
Date:   2016-04-13T08:46:58Z

    make TreeNode extends Product

commit ab75d4857cb203714ee037aea2e55de776c4dd32
Author: Yijie Shen <he...@gmail.com>
Date:   2016-04-15T14:51:20Z

    wip expressions validation, should create expressions for functions next

commit 61e4bb09d754fe0aca7624e3fcd70d364e4154d3
Author: Yijie Shen <he...@gmail.com>
Date:   2016-04-16T07:02:07Z

    add functions for math and string

commit 6abbfad0a11a3874150baa4dcabf6caab37cf0be
Author: Yijie Shen <he...@gmail.com>
Date:   2016-04-18T16:49:04Z

    wip move table api on logicalNode

commit 64ecdbef273895e4527fb6b5120d92acb0d20542
Author: Yijie Shen <he...@gmail.com>
Date:   2016-04-19T05:20:47Z

    resolve and validate next

commit dc42a44cc47cdb619076074eda95b84157554337
Author: Yijie Shen <he...@gmail.com>
Date:   2016-04-19T19:26:58Z

    wip

commit c04292abe6484e095171a66c399c16e50e98870a
Author: Yijie Shen <he...@gmail.com>
Date:   2016-04-20T12:26:25Z

    fix bug in validator, merge eval, add doc

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

Posted by yjshen <gi...@git.apache.org>.
Github user yjshen commented on the pull request:

    https://github.com/apache/flink/pull/1916#issuecomment-213900827
  
    @fhueske, thanks for your job. Looking forward to more discussions on this :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

Posted by yjshen <gi...@git.apache.org>.
Github user yjshen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1916#discussion_r60728611
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala ---
    @@ -0,0 +1,188 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.plan.logical
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.calcite.rel.RelNode
    +import org.apache.calcite.rel.`type`.RelDataTypeFactory
    +import org.apache.calcite.rel.core.JoinRelType
    +import org.apache.calcite.rel.logical.LogicalProject
    +import org.apache.calcite.schema.{Table => CTable}
    +import org.apache.calcite.tools.RelBuilder
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +import org.apache.flink.api.java.operators.join.JoinType
    +import org.apache.flink.api.table.expressions._
    +import org.apache.flink.api.table.typeutils.TypeConverter
    +import org.apache.flink.api.table.validate.ValidationException
    +
    +case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extends UnaryNode {
    +  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
    +
    +  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
    +    def allAlias: Boolean = {
    +      projectList.forall { proj =>
    +        proj match {
    +          case Alias(r: ResolvedFieldReference, name) => true
    +          case _ => false
    +        }
    +      }
    +    }
    +    child.toRelNode(relBuilder)
    +    if (allAlias) {
    +      relBuilder.push(
    +        LogicalProject.create(relBuilder.peek(),
    +          projectList.map(_.toRexNode(relBuilder)).asJava,
    +          projectList.map(_.name).asJava))
    +    } else {
    +      relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*)
    +    }
    +  }
    +}
    +
    +case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) extends UnaryNode {
    +  override def output: Seq[Attribute] =
    +    throw new UnresolvedException("Invalid call to output on AliasNode")
    +
    +  override def toRelNode(relBuilder: RelBuilder): RelBuilder =
    +    throw new UnresolvedException("Invalid call to toRelNode on AliasNode")
    +
    +  override lazy val resolved: Boolean = false
    +}
    +
    +case class Distinct(child: LogicalNode) extends UnaryNode {
    +  override def output: Seq[Attribute] = child.output
    +
    +  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
    +    child.toRelNode(relBuilder)
    +    relBuilder.distinct()
    +  }
    +}
    +
    +case class Filter(condition: Expression, child: LogicalNode) extends UnaryNode {
    +  override def output: Seq[Attribute] = child.output
    +
    +  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
    +    child.toRelNode(relBuilder)
    +    relBuilder.filter(condition.toRexNode(relBuilder))
    +  }
    +}
    +
    +case class Aggregate(
    +    groupingExpressions: Seq[Expression],
    +    aggregateExpressions: Seq[NamedExpression],
    +    child: LogicalNode) extends UnaryNode {
    +
    +  override def output: Seq[Attribute] = {
    +    (groupingExpressions ++ aggregateExpressions) map { agg =>
    +      agg match {
    +        case ne: NamedExpression => ne.toAttribute
    +        case e => Alias(e, e.toString).toAttribute
    +      }
    +    }
    +  }
    +
    +  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
    +    child.toRelNode(relBuilder)
    +    relBuilder.aggregate(
    +      relBuilder.groupKey(groupingExpressions.map(_.toRexNode(relBuilder)).asJava),
    +      aggregateExpressions.filter(_.isInstanceOf[Alias]).map { e =>
    +        e match {
    +          case Alias(agg: Aggregation, name) => agg.toAggCall(name)(relBuilder)
    +          case _ => null // this should never happen
    +        }
    +      }.asJava)
    +  }
    +}
    +
    +case class Union(left: LogicalNode, right: LogicalNode) extends BinaryNode {
    +  override def output: Seq[Attribute] = left.output
    +
    +  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
    +    left.toRelNode(relBuilder)
    +    right.toRelNode(relBuilder)
    +    relBuilder.union(true)
    +  }
    +}
    --- End diff --
    
    Need to override the `resolved` field to check equality of `left` and `right` output schema. Will do in next commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

Posted by yjshen <gi...@git.apache.org>.
Github user yjshen commented on the pull request:

    https://github.com/apache/flink/pull/1916#issuecomment-213411425
  
    For ease of review, I would like to explain this RP with more details.
    
    While table api are called to construct a query, it was first constructed as a operator tree, `LogicalNode` is used to express the tree node. Therefore, after we finished constructing a query and about to translate it into a dataset/datastream program, the constructed logical plan looks like:
    ```
    Aggregate(List of group by, List of aggregate)
    +- Filter (condition expression)
        +- Union
            +- Project (select column expression) 
                +- TableScan(DS1) 
            +- Project (select column expression)
                +- TableScan(DS2)
    ```
    At this time, only the leaf node: TableScan (`CatalogNode` in current implementation) is equipped with type information, in order to do plan validation, we need to **annotate** the full logical plan with type information first and use type information to do **validate** works.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

Posted by yjshen <gi...@git.apache.org>.
Github user yjshen commented on the pull request:

    https://github.com/apache/flink/pull/1916#issuecomment-216269369
  
    Will start to work on eager validation now. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1916#issuecomment-212485745
  
    Thanks @yjshen for working on this issue! Unified validation and exceptions are a big improvement, IMO. I'll try to have a look soon.
    
    @twalthr, can you have a look at the changes done on the expressions?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

Posted by yjshen <gi...@git.apache.org>.
Github user yjshen commented on the pull request:

    https://github.com/apache/flink/pull/1916#issuecomment-213419043
  
    The type annotation work is done from bottom to top:
    
    Firstly, we know each schema of the two input, and we know `List[] expression` in `Project` are used to manipulate one row of table data as input and output one value per expression, therefore, we can infer the the output schema of `Project` (in the current impl this was expressed as: `def output: Seq[Attribute]`) if we know each expressions `dataType`. 
    
    For example, `Add`'s dataType is same as it's input, `Or`'s dataType is always `Boolean`, `pow(a, b)`'s dataType is always `Double`, however, if and only if we understand all kinds of expressions, we are able to infer its `dataType`. The main problems here is we only have `Call`(Unresolved Function) generated during expression construction, therefore, we should resolve them first into solid `Expression`s. `FunctionCatalog` is introduced here for a mapping from `FunctionName -> Expression`, we can easily finish the translation work as we look up `catalog`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

Posted by yjshen <gi...@git.apache.org>.
Github user yjshen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1916#discussion_r61378731
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala ---
    @@ -215,7 +215,7 @@ class ScalarFunctionsTest {
     
       }
     
    -  @Test
    +  @Ignore
    --- End diff --
    
    `exp` `log` `pow` and `ln` should have `Double` as input. What I was thinking is, we should add some extra `type coercion` rules and add an `cast` when we can do it safely(when an expression is asking a `Double` but we provide a `Int`), for example, `Byte` to `Double`, `Long` to `Double` and so on, and rewrite the expression as `pow(cast(x, Double))`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1916#issuecomment-215414605
  
    If we decide for eager validation, we can also simplify the code a bit, right? 
    
    I might be wrong but we could remove `PlanPreparation` and the recursive validation of `LogicalNode` since the inputs of an operator would have been already validated. The checks of `Validator.validate()` could be moved to the respective `LogicalNode`. Not sure if we would still need the `RuleExecutor`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

Posted by yjshen <gi...@git.apache.org>.
Github user yjshen closed the pull request at:

    https://github.com/apache/flink/pull/1916


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1916#discussion_r61291959
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala ---
    @@ -215,7 +215,7 @@ class ScalarFunctionsTest {
     
       }
     
    -  @Test
    +  @Ignore
    --- End diff --
    
    Why did you exclude these tests?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the pull request:

    https://github.com/apache/flink/pull/1916#issuecomment-215061267
  
    @yjshen I looked through the code changes. I was quite impressed that your PR touches nearly every class of the current API.
    
    Basically your changes seem to work, however, I'm not sure if we want to implement the validation phase for every scalar function ourselves. Actually, Calcite already comes with type inference, type checking and validation capabilities. I don't know if we want to reinvent the wheel at this point. Your approach inserts a layer under the Table API for doing the validation. However, instead, this layer could also translate the plan into a SQL tree (on top of RelNodes). We could then let Calcite do the work of validation.
    
    This could also solve another problem that I faced when working on FLINK-3580. If you take a look at `StandardConvertletTable` of Calcite, you see that Calcite also does some conversions which we also need to implement ourselves if we do not base the Table API on top of SQL.
    
    We need to discuss how we want to proceed. Both solution are not perfect.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

Posted by yjshen <gi...@git.apache.org>.
Github user yjshen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1916#discussion_r60728312
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala ---
    @@ -199,12 +151,12 @@ class Table(
         *   tab.filter('name === "Fred")
         * }}}
         */
    -  def filter(predicate: Expression): Table = {
    -
    -    relBuilder.push(relNode)
    -    relBuilder.filter(predicate.toRexNode(relBuilder))
    -    
    -    new Table(relBuilder.build(), tableEnv)
    +  def filter(predicate: Expression): Table = withPlan {
    +//    logicalPlan match {
    +//      case j: Join => j.copy(condition = Some(predicate))
    +//      case o => Filter(predicate, logicalPlan)
    +//    }
    --- End diff --
    
    Will remove this the next commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the pull request:

    https://github.com/apache/flink/pull/1916#issuecomment-214332719
  
    Thanks for the contribution @yjshen. I will also have a look at it tomorrow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

Posted by yjshen <gi...@git.apache.org>.
Github user yjshen commented on the pull request:

    https://github.com/apache/flink/pull/1916#issuecomment-215332571
  
    > I think it would be good to eagerly check each method call of the Table API. This would make debugging easier, because exceptions would be thrown where the error is caused. Please correct me if I am wrong, but I think we would not lose validation coverage compared to the coverage this PR if we do eager validation? It might also be easier, because we do not need the recursive operator traversal (still the expression traversal though). Maybe we can even directly translate to RelNodes after validation, just like we do right now. I think a lot of this PR could be used for eager validation, not sure if it would be easily possible with the SqlNode validation approach. 
    
    Regarding the eager validation you mentioned, I think that could be accomplished by calling `validate()` each time I am constructing another `Table`, in other words, each time I am calling a `Table api`, changing the current code from:
    ``` scala
    class Table(
        private[flink] val tableEnv: TableEnvironment,
        private[flink] val planPreparation: PlanPreparation) {
    
      def this(tableEnv: TableEnvironment, logicalPlan: LogicalNode) = {
        this(tableEnv, new PlanPreparation(tableEnv, logicalPlan))
      }
    ```
    to 
    ``` scala
    def this(tableEnv: TableEnvironment, logicalPlan: LogicalNode) = {
        this(tableEnv, {
          val pp = new PlanPreparation(tableEnv, logicalPlan)
          pp.validate()
          pp
        })
    ```
    and also add an additional flag annotate the logical node as `validate` and therefore avoid the `recursive logical plan traversal` would be enough.  Do I understand your idea correctly?
    On the other hand, I prefer to postponed `RelNode` construction until we are going to run the query on Flink, since we only need `RelNode` at that time.
    
    > While reviewing the PR, I noticed that some classes seem to be partially derived from Spark's code base (e.g., TreeNode and RuleExecutor). I know there are some common patterns that apply in optimizers, but it is good style to give credit to the original source code. 
    Can you list which classes are derived from Spark code and add a comment to them pointing to the source of the code?
    
    OK, I will do a more detailed scan and give credit to all the original source code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1916#discussion_r61291865
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala ---
    @@ -17,13 +17,34 @@
      */
     package org.apache.flink.api.table.expressions
     
    -import java.util.concurrent.atomic.AtomicInteger
    -
     import org.apache.calcite.rex.RexNode
     import org.apache.calcite.tools.RelBuilder
     
    -abstract class Expression extends TreeNode[Expression] { self: Product =>
    -  def name: String = Expression.freshName("expression")
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.table.trees.TreeNode
    +import org.apache.flink.api.table.validate.ExprValidationResult
    +
    +abstract class Expression extends TreeNode[Expression] {
    +  /**
    +    * Returns the [[TypeInformation]] for evaluating this expression.
    +    * It is sometimes available until the expression is valid.
    +    */
    +  def dataType: TypeInformation[_]
    +
    +  /**
    +    * One pass validation of the expression tree in post order.
    +    */
    +  lazy val valid: Boolean = childrenValid && validateInput().isSuccess
    +
    +  def childrenValid: Boolean = children.forall(_.valid)
    +
    +  /**
    +    * Check input data types, inputs number or other properties specified by this expression.
    +    * Return `ValidationSuccess` if it pass the check,
    +    * or `ValidationFailure` with supplement message explaining the error.
    +    * Note: we should only call this method until `childrenValidated == true`
    --- End diff --
    
    `childrenValidated` -> `childrenValid`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

Posted by yjshen <gi...@git.apache.org>.
Github user yjshen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1916#discussion_r61378085
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala ---
    @@ -17,13 +17,34 @@
      */
     package org.apache.flink.api.table.expressions
     
    -import java.util.concurrent.atomic.AtomicInteger
    -
     import org.apache.calcite.rex.RexNode
     import org.apache.calcite.tools.RelBuilder
     
    -abstract class Expression extends TreeNode[Expression] { self: Product =>
    -  def name: String = Expression.freshName("expression")
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.table.trees.TreeNode
    +import org.apache.flink.api.table.validate.ExprValidationResult
    +
    +abstract class Expression extends TreeNode[Expression] {
    +  /**
    +    * Returns the [[TypeInformation]] for evaluating this expression.
    +    * It is sometimes available until the expression is valid.
    +    */
    +  def dataType: TypeInformation[_]
    +
    +  /**
    +    * One pass validation of the expression tree in post order.
    +    */
    +  lazy val valid: Boolean = childrenValid && validateInput().isSuccess
    +
    +  def childrenValid: Boolean = children.forall(_.valid)
    +
    +  /**
    +    * Check input data types, inputs number or other properties specified by this expression.
    +    * Return `ValidationSuccess` if it pass the check,
    +    * or `ValidationFailure` with supplement message explaining the error.
    +    * Note: we should only call this method until `childrenValidated == true`
    --- End diff --
    
    Ah, yes!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

Posted by yjshen <gi...@git.apache.org>.
Github user yjshen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1916#discussion_r61382801
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala ---
    @@ -355,4 +380,26 @@ object TableEnvironment {
         new ScalaStreamTableEnv(executionEnvironment, tableConfig)
       }
     
    +  /**
    +    * The primary workflow for executing plan validation for that generated from Table API.
    +    * The validation is intentionally designed as a lazy procedure and triggered when we
    +    * are going to run on Flink core.
    +    */
    +  class PlanPreparation(val env: TableEnvironment, val logical: LogicalNode) {
    +
    +    lazy val resolvedPlan: LogicalNode = env.getValidator.resolve(logical)
    +
    +    def validate(): Unit = env.getValidator.validate(resolvedPlan)
    +
    +    lazy val relNode: RelNode = {
    +      env match {
    +        case _: BatchTableEnvironment =>
    --- End diff --
    
    Yes, it shouldn't be distinguished. The difference should exist inside validator's rule set.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

Posted by yjshen <gi...@git.apache.org>.
Github user yjshen commented on the pull request:

    https://github.com/apache/flink/pull/1916#issuecomment-215336755
  
    Hi @fhueske @twalthr, thanks for the detailed review work! 👍 
    I've left some comment above to express my opinion, looking forward to more discussions on this :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1916#discussion_r61291698
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala ---
    @@ -17,13 +17,34 @@
      */
     package org.apache.flink.api.table.expressions
     
    -import java.util.concurrent.atomic.AtomicInteger
    -
     import org.apache.calcite.rex.RexNode
     import org.apache.calcite.tools.RelBuilder
     
    -abstract class Expression extends TreeNode[Expression] { self: Product =>
    -  def name: String = Expression.freshName("expression")
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.table.trees.TreeNode
    +import org.apache.flink.api.table.validate.ExprValidationResult
    +
    +abstract class Expression extends TreeNode[Expression] {
    +  /**
    +    * Returns the [[TypeInformation]] for evaluating this expression.
    +    * It is sometimes available until the expression is valid.
    --- End diff --
    
    +not ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1916#issuecomment-215142967
  
    Hi @yjshen, thanks for your patience. I also finished a first pass over the PR. 
    
    I'd like to propose a third alternative, in addition to the custom validation phase (this PR) and generating `SqlNode`s and using Calcite's validator. Both approaches would mean that the validation happens before the logical plan is translated into a `RelNode`. I think it would be good to eagerly check each method call of the Table API. This would make debugging easier, because exceptions would be thrown where the error is caused. Please correct me if I am wrong, but I think we would not lose validation coverage compared to the coverage this PR if we do eager validation? It might also be easier, because we do not need the recursive operator traversal (still the expression traversal though). Maybe we can even directly translate to `RelNode`s after validation, just like we do right now. I think a lot of this PR could be used for eager validation, not sure if it would be easily possible with the `SqlNode` validation approach. 
    What do you think about eagerly validation, @yjshen and @twalthr?
    
    While reviewing the PR, I noticed that some classes seem to be partially derived from Spark's code base (e.g., `TreeNode` and `RuleExecutor`). I know there are some common patterns that apply in optimizers, but it is good style to give credit to the original source code. 
    Can you list which classes are derived from Spark code and add a comment to them pointing to the source of the code?
    
    Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1916#issuecomment-213567820
  
    Thanks for the additional information @yjshen! I'm a bit behind with PR reviews, but will definitely have a look begin of next week. Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1916#issuecomment-216270686
  
    Great, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1916#discussion_r61291621
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala ---
    @@ -355,4 +380,26 @@ object TableEnvironment {
         new ScalaStreamTableEnv(executionEnvironment, tableConfig)
       }
     
    +  /**
    +    * The primary workflow for executing plan validation for that generated from Table API.
    +    * The validation is intentionally designed as a lazy procedure and triggered when we
    +    * are going to run on Flink core.
    +    */
    +  class PlanPreparation(val env: TableEnvironment, val logical: LogicalNode) {
    +
    +    lazy val resolvedPlan: LogicalNode = env.getValidator.resolve(logical)
    +
    +    def validate(): Unit = env.getValidator.validate(resolvedPlan)
    +
    +    lazy val relNode: RelNode = {
    +      env match {
    +        case _: BatchTableEnvironment =>
    --- End diff --
    
    Why do you distinguish here? It's the same code, no?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

Posted by yjshen <gi...@git.apache.org>.
Github user yjshen commented on the pull request:

    https://github.com/apache/flink/pull/1916#issuecomment-215422057
  
    @fhueske If that's the preferred way, I will try to do some simplifications and see.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

Posted by yjshen <gi...@git.apache.org>.
Github user yjshen commented on the pull request:

    https://github.com/apache/flink/pull/1916#issuecomment-215334972
  
    @twalthr, translate table API call to `SqlNode` and use `validator` in Calcite seems another reasonable solution, maybe it's hard to do `eager validation` as @fhueske proposed? 
    Yes, I agree we should discuss which way is preferred solution :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---