You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by yjshen <gi...@git.apache.org> on 2016/05/24 05:00:31 UTC

[GitHub] flink pull request: [FLINK-3941][TableAPI]Add support for UNION (w...

GitHub user yjshen opened a pull request:

    https://github.com/apache/flink/pull/2025

    [FLINK-3941][TableAPI]Add support for UNION (with duplicate elimination)

    This PR aims at adding `UNION` support in TableAPI and SQL by:
    - Extending Table API with a new `union()` method
    - Relaxing `DataSetUnionRule` to enable union conversion
    - a `distinct` after `union` in flink execution plan to eliminate duplicate rows
    
    Note: Currently, I think `Union` do not has its counterpart in DataStream, therefore left unsupported. If it's not true, I'd like to adapt this PR.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/yjshen/flink union

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2025.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2025
    
----
commit fb8b61b5638f0b52f5857341c7acc95e8985b2d4
Author: Yijie Shen <he...@gmail.com>
Date:   2016-05-24T04:46:21Z

    add union support

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3941][TableAPI]Add support for UNION (w...

Posted by yjshen <gi...@git.apache.org>.
Github user yjshen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2025#discussion_r64382835
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala ---
    @@ -69,16 +73,23 @@ class DataSetUnion(
           rows + metadata.getRowCount(child)
         }
     
    -    planner.getCostFactory.makeCost(rowCnt, 0, 0)
    +    planner.getCostFactory.makeCost(
    +      rowCnt,
    +      if (all) 0 else rowCnt,
    +      if (all) 0 else rowCnt)
       }
     
       override def translateToPlan(
           tableEnv: BatchTableEnvironment,
           expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
     
    -    val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
    -    val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
    -    leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
    +    val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    +    val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    +    if (all) {
    +      leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
    +    } else {
    +      leftDataSet.union(rightDataSet).distinct().asInstanceOf[DataSet[Any]]
    --- End diff --
    
    In `DATASET_OPT_RULES`, `UnionToDistinctRule` substitute `Union` with `UnionAll` followed by an `Aggregate`, therefore this branch doesn't actually get executed. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3941][TableAPI]Add support for UNION (w...

Posted by yjshen <gi...@git.apache.org>.
Github user yjshen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2025#discussion_r64383299
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala ---
    @@ -69,16 +73,23 @@ class DataSetUnion(
           rows + metadata.getRowCount(child)
         }
     
    -    planner.getCostFactory.makeCost(rowCnt, 0, 0)
    +    planner.getCostFactory.makeCost(
    +      rowCnt,
    +      if (all) 0 else rowCnt,
    +      if (all) 0 else rowCnt)
       }
     
       override def translateToPlan(
           tableEnv: BatchTableEnvironment,
           expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
     
    -    val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
    -    val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
    -    leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
    +    val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    +    val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    --- End diff --
    
    `expectedType` is passed down to `Union`'s children, enables possible conversion to `Row` enforced by `Aggregate`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3941][TableAPI]Add support for UNION (w...

Posted by yjshen <gi...@git.apache.org>.
Github user yjshen commented on the pull request:

    https://github.com/apache/flink/pull/2025#issuecomment-221225446
  
    Hi @fhueske , thanks for the review! The changes are:
    - Remove the whole `java/batch/table/UnionITCase.java` 
    - Remove `testUnionWithFilter`, `testUnionWithJoin` and `testUnionWithAggregate`
    - Add SQL Union Test.
    
    Looks better now?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3941][TableAPI]Add support for UNION (w...

Posted by yjshen <gi...@git.apache.org>.
Github user yjshen commented on the pull request:

    https://github.com/apache/flink/pull/2025#issuecomment-221508839
  
    @fhueske would you please take another look? Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3941][TableAPI]Add support for UNION (w...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2025#discussion_r64549056
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala ---
    @@ -69,16 +73,23 @@ class DataSetUnion(
           rows + metadata.getRowCount(child)
         }
     
    -    planner.getCostFactory.makeCost(rowCnt, 0, 0)
    +    planner.getCostFactory.makeCost(
    +      rowCnt,
    +      if (all) 0 else rowCnt,
    +      if (all) 0 else rowCnt)
       }
     
       override def translateToPlan(
           tableEnv: BatchTableEnvironment,
           expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
     
    -    val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
    -    val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
    -    leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
    +    val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    +    val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    +    if (all) {
    +      leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
    +    } else {
    +      leftDataSet.union(rightDataSet).distinct().asInstanceOf[DataSet[Any]]
    --- End diff --
    
    This method should called by the optimizer when a new `DataSetUnion` node is created to estimate the cost of the subplan (the new node + all recursive input nodes). If there is a cheaper plan that does the same thing, the more expensive plan is discarded.
    
    So, you have a plan with a non-union all operator. The optimizer knows the cost of this plan. Then, the `UnionToDistinctRule` is called and a new union + distinct operators are created. For both, the `computeSelfCost` method is called to compute the cost estimate of the plan and then the cheaper of both plans is preserved. (This is a bit simplified, because the optimization rules are applied on `LogicalRel` nodes but the cost estimation happens on the `DataSetRel` nodes).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3941][TableAPI]Add support for UNION (w...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2025#discussion_r64541694
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala ---
    @@ -69,16 +73,23 @@ class DataSetUnion(
           rows + metadata.getRowCount(child)
         }
     
    -    planner.getCostFactory.makeCost(rowCnt, 0, 0)
    +    planner.getCostFactory.makeCost(
    +      rowCnt,
    +      if (all) 0 else rowCnt,
    +      if (all) 0 else rowCnt)
       }
     
       override def translateToPlan(
           tableEnv: BatchTableEnvironment,
           expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
     
    -    val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
    -    val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
    -    leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
    +    val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    +    val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    +    if (all) {
    +      leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
    +    } else {
    +      leftDataSet.union(rightDataSet).distinct().asInstanceOf[DataSet[Any]]
    --- End diff --
    
    Oh, yes. Completely forgot about that rule... \U0001f60a 
    So, we already supported the non-all union for SQL. Only the Table API was missing the `union()` method.
    I think there are two ways to continue: 
    - remove the `UnionToDistinctRule` from `FlinkRuleSets`
    - revert the changes on `DataSetUnion` (except of pushing down the `expectedType`) and `DataSetUnionRule`.
    
    I am fine either ways.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3941][TableAPI]Add support for UNION (w...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/2025#issuecomment-221796795
  
    Perfect! Good to merge. Thanks @yjshen!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3941][TableAPI]Add support for UNION (w...

Posted by yjshen <gi...@git.apache.org>.
Github user yjshen commented on the pull request:

    https://github.com/apache/flink/pull/2025#issuecomment-221762766
  
    Hi @fhueske , I've updated the `expectedType` fix and reverted changes in `DataSetUnion` and `DataSetUnionRule` to make the implementation clear and simple, what do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3941][TableAPI]Add support for UNION (w...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2025#discussion_r64338066
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala ---
    @@ -69,7 +73,7 @@ class DataSetUnion(
           rows + metadata.getRowCount(child)
         }
     
    -    planner.getCostFactory.makeCost(rowCnt, 0, 0)
    +    planner.getCostFactory.makeCost(if (all) rowCnt else rowCnt * 0.1, 0, 0)
    --- End diff --
    
    The cost for union should be higher than for union all. Also, `rowCnt` is the number of rows processed, not the result size.
    How about this?
    ```
    planner.getCostFactory.makeCost(
      rowCnt, 
      if (all) 0 else, rowCnt,
      if (all) 0 else rowCnt)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3941][TableAPI]Add support for UNION (w...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2025#discussion_r64542323
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala ---
    @@ -69,16 +73,23 @@ class DataSetUnion(
           rows + metadata.getRowCount(child)
         }
     
    -    planner.getCostFactory.makeCost(rowCnt, 0, 0)
    +    planner.getCostFactory.makeCost(
    +      rowCnt,
    +      if (all) 0 else rowCnt,
    +      if (all) 0 else rowCnt)
       }
     
       override def translateToPlan(
           tableEnv: BatchTableEnvironment,
           expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
     
    -    val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
    -    val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
    -    leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
    +    val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    +    val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    --- End diff --
    
    Actually, this fix should be extended. If `expectedType == None`, we must ensure that both inputs provide the same type, i.e., request the type of the left input on the right input (or vice versa).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3941][TableAPI]Add support for UNION (w...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2025#discussion_r64367480
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala ---
    @@ -44,7 +44,7 @@ class AggregateMapFunction[IN, OUT](
     
       override def map(value: IN): OUT = {
         
    -    val input = value.asInstanceOf[Row]
    +    val input = value.asInstanceOf[Product]
    --- End diff --
    
    Currently aggregates do only support `Row`, because the aggregate code is not generated yet.
    `DataSetAggregate` enforces `Row` as input type (see `DataSetAggregate` line 99), so `value.asInstanceOf[Row]` should be safe. 
    
    Did you observe a problem with this cast?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3941][TableAPI]Add support for UNION (w...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2025


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3941][TableAPI]Add support for UNION (w...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/2025#issuecomment-221232111
  
    Changes look good. Can you also please update the supported feature set in the docs (`docs/apis/table.md`)?
    
    Should be good to merge once that is done. 
    Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3941][TableAPI]Add support for UNION (w...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2025#discussion_r64543161
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala ---
    @@ -69,16 +73,23 @@ class DataSetUnion(
           rows + metadata.getRowCount(child)
         }
     
    -    planner.getCostFactory.makeCost(rowCnt, 0, 0)
    +    planner.getCostFactory.makeCost(
    +      rowCnt,
    +      if (all) 0 else rowCnt,
    +      if (all) 0 else rowCnt)
       }
     
       override def translateToPlan(
           tableEnv: BatchTableEnvironment,
           expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
     
    -    val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
    -    val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
    -    leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
    +    val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    +    val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    --- End diff --
    
    With this fix, we should be able to remove all guards for efficient type usage in the Union ITCases.
    There are a few like the following:
    ```
    if (tEnv.getConfig.getEfficientTypeUsage) {
      return
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3941][TableAPI]Add support for UNION (w...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/2025#issuecomment-221186678
  
    Thanks for the PR, @yjshen. Looks good except for a few minor comments.
    Can you also add one test method to the Scala SQL UnionITCase, such that the SQL side is also covered?
    
    Union on streams cannot be supported right now. It would need a lot of the windowing logic to deduplicate rows.
    
    Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3941][TableAPI]Add support for UNION (w...

Posted by yjshen <gi...@git.apache.org>.
Github user yjshen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2025#discussion_r64547780
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala ---
    @@ -69,16 +73,23 @@ class DataSetUnion(
           rows + metadata.getRowCount(child)
         }
     
    -    planner.getCostFactory.makeCost(rowCnt, 0, 0)
    +    planner.getCostFactory.makeCost(
    +      rowCnt,
    +      if (all) 0 else rowCnt,
    +      if (all) 0 else rowCnt)
       }
     
       override def translateToPlan(
           tableEnv: BatchTableEnvironment,
           expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
     
    -    val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
    -    val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
    -    leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
    +    val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    +    val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    +    if (all) {
    +      leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
    +    } else {
    +      leftDataSet.union(rightDataSet).distinct().asInstanceOf[DataSet[Any]]
    --- End diff --
    
    I have question here on computeSelfCost, when will this method called? Is that possible we are using union's computeSelfCompute first, before UnionToDistinctRule are called?  I was trying to understand this last night but didn't come up with an idea.   


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3941][TableAPI]Add support for UNION (w...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2025#discussion_r64540766
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala ---
    @@ -69,16 +73,23 @@ class DataSetUnion(
           rows + metadata.getRowCount(child)
         }
     
    -    planner.getCostFactory.makeCost(rowCnt, 0, 0)
    +    planner.getCostFactory.makeCost(
    +      rowCnt,
    +      if (all) 0 else rowCnt,
    +      if (all) 0 else rowCnt)
       }
     
       override def translateToPlan(
           tableEnv: BatchTableEnvironment,
           expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
     
    -    val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
    -    val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
    -    leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
    +    val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    +    val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    --- End diff --
    
    Good catch!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3941][TableAPI]Add support for UNION (w...

Posted by yjshen <gi...@git.apache.org>.
Github user yjshen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2025#discussion_r64546723
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala ---
    @@ -69,16 +73,23 @@ class DataSetUnion(
           rows + metadata.getRowCount(child)
         }
     
    -    planner.getCostFactory.makeCost(rowCnt, 0, 0)
    +    planner.getCostFactory.makeCost(
    +      rowCnt,
    +      if (all) 0 else rowCnt,
    +      if (all) 0 else rowCnt)
       }
     
       override def translateToPlan(
           tableEnv: BatchTableEnvironment,
           expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
     
    -    val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
    -    val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
    -    leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
    +    val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    +    val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    --- End diff --
    
    Ah yes, I didn't notice that, will do :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3941][TableAPI]Add support for UNION (w...

Posted by yjshen <gi...@git.apache.org>.
Github user yjshen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2025#discussion_r64371728
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala ---
    @@ -44,7 +44,7 @@ class AggregateMapFunction[IN, OUT](
     
       override def map(value: IN): OUT = {
         
    -    val input = value.asInstanceOf[Row]
    +    val input = value.asInstanceOf[Product]
    --- End diff --
    
    In `Execution mode = COLLECTION, Table config = EFFICIENT` for `testUnion`, the `value` is of `scala.Tuple3` type, not work as expected?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3941][TableAPI]Add support for UNION (w...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2025#discussion_r64339091
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UnionITCase.scala ---
    @@ -54,7 +54,22 @@ class UnionITCase(
       }
     
       @Test
    -  def testTernaryUnion(): Unit = {
    +  def testUnion(): Unit = {
    --- End diff --
    
    There were some concerns about the build time of the `flink-table` module lately. In the past, we added quite a few integration tests without thinking about the added coverage vs. build time trade-off.
    
    I would propose to replace the `testUnionWithFilter()` and `testUnionWithJoin()` tests with your new methods because these methods do not add to test coverage, IMO. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3941][TableAPI]Add support for UNION (w...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/2025#issuecomment-221523124
  
    Hi, thanks for the update and the `expectedType` fix!
    Can you extend it as I described in the comment and do either of the two options regarding the `UnionToDistinctRule`?
    
    Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3941][TableAPI]Add support for UNION (w...

Posted by yjshen <gi...@git.apache.org>.
Github user yjshen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2025#discussion_r64377174
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala ---
    @@ -44,7 +44,7 @@ class AggregateMapFunction[IN, OUT](
     
       override def map(value: IN): OUT = {
         
    -    val input = value.asInstanceOf[Row]
    +    val input = value.asInstanceOf[Product]
    --- End diff --
    
    Ah, I think I get why this happen, will fix this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3941][TableAPI]Add support for UNION (w...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/2025#issuecomment-221832119
  
    Merging


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3941][TableAPI]Add support for UNION (w...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2025#discussion_r64338444
  
    --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/UnionITCase.java ---
    @@ -43,7 +43,7 @@ public UnionITCase(TestExecutionMode mode) {
     	}
     
     	@Test
    -	public void testUnion() throws Exception {
    +	public void testUnionAll() throws Exception {
    --- End diff --
    
    Actually, I would completely remove the whole Java `UnionITCase.java` file. It tests exactly the same methods as the Scala tests (no expression parsing involved). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---