You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by wuchong <gi...@git.apache.org> on 2017/07/07 08:17:14 UTC

[GitHub] flink pull request #4279: [FLINK-7126] [table] Support Distinct for Stream S...

GitHub user wuchong opened a pull request:

    https://github.com/apache/flink/pull/4279

    [FLINK-7126] [table] Support Distinct for Stream SQL and Table API

    Distinct is a syntax sugar which is translated to a GROUP BY by Calcite. As we have supported GROUP BY on stream tables, distinct is also supported.
    
    
    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [ ] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [ ] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [ ] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/wuchong/flink distinct

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4279.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4279
    
----
commit dbd9ad9e244fd653dc6e512807f0505e4e3367df
Author: Jark Wu <ja...@apache.org>
Date:   2017-07-07T08:10:45Z

    [FLINK-7126] [table] Support Distinct for Stream SQL and Table API

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4279: [FLINK-7126] [table] Support Distinct for Stream S...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4279#discussion_r126685098
  
    --- Diff: docs/dev/table/sql.md ---
    @@ -284,12 +284,14 @@ FROM Orders
         <tr>
           <td>
             <strong>Distinct</strong><br>
    -        <span class="label label-primary">Batch</span>
    +        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span> <br>
    +        <span class="label label-info">Result Updating</span>
           </td>
           <td>
     {% highlight sql %}
     SELECT DISTINCT users FROM Orders
     {% endhighlight %}
    +       <p><b>Note:</b> DISTINCT on a streaming table produces an updating result. And for streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming.html">Streaming Concepts</a> for details.</p>
    --- End diff --
    
    I think we do not need this first sentence (should be covert by the "Result Updating" tag). 
    -> `And for streaming queries the required ...` -> `For streaming queries the required ...`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4279: [FLINK-7126] [table] Support Distinct for Stream S...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4279


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4279: [FLINK-7126] [table] Support Distinct for Stream SQL and ...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on the issue:

    https://github.com/apache/flink/pull/4279
  
    I'll merge this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4279: [FLINK-7126] [table] Support Distinct for Stream S...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4279#discussion_r126534385
  
    --- Diff: docs/dev/table/tableApi.md ---
    @@ -373,7 +373,7 @@ Table result = orders
         <tr>
           <td>
             <strong>Distinct</strong><br>
    -        <span class="label label-primary">Batch</span>
    +        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
    --- End diff --
    
    This also needs the `Result Updating` label.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4279: [FLINK-7126] [table] Support Distinct for Stream SQL and ...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on the issue:

    https://github.com/apache/flink/pull/4279
  
    Thanks @fhueske , I have addressed your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4279: [FLINK-7126] [table] Support Distinct for Stream S...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4279#discussion_r126617388
  
    --- Diff: docs/dev/table/tableApi.md ---
    @@ -373,7 +373,7 @@ Table result = orders
         <tr>
           <td>
             <strong>Distinct</strong><br>
    -        <span class="label label-primary">Batch</span>
    +        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
    --- End diff --
    
    If the input is append/insert only, distinct will only produce insert changes as well. 
    However, I don't think we make this distinction in the decoration phase. AFAIK, it will always be marked to produce updates right now.
    
    IMO, it would be good if DISTINCT on append-only input would not be treated as producing updates. We don't have to fix it in this PR, but I think it would be good to make such a change. What do you think @wuchong?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4279: [FLINK-7126] [table] Support Distinct for Stream SQL and ...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on the issue:

    https://github.com/apache/flink/pull/4279
  
    Thanks @sunjincheng121 , I have updated tableAPI document.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4279: [FLINK-7126] [table] Support Distinct for Stream SQL and ...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on the issue:

    https://github.com/apache/flink/pull/4279
  
    Hi @wuchong  Thanks for the update.
    +1 to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4279: [FLINK-7126] [table] Support Distinct for Stream SQL and ...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on the issue:

    https://github.com/apache/flink/pull/4279
  
    Updated


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4279: [FLINK-7126] [table] Support Distinct for Stream SQL and ...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on the issue:

    https://github.com/apache/flink/pull/4279
  
    merging ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4279: [FLINK-7126] [table] Support Distinct for Stream S...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4279#discussion_r126535807
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala ---
    @@ -38,6 +38,43 @@ class GroupAggregationsITCase extends StreamingWithStateTestBase {
       private val queryConfig = new StreamQueryConfig()
       queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2))
     
    +
    +  @Test
    +  def testDistinct(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    env.setStateBackend(getStateBackend)
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.clear
    +
    +    val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
    +      .select('b).distinct()
    +
    +    val results = t.toRetractStream[Row](queryConfig)
    +    results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
    +    env.execute()
    +
    +    val expected = mutable.MutableList("1", "2", "3", "4", "5", "6")
    +    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
    +  }
    +
    +  @Test
    +  def testDistinctAfterAggregate(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    env.setStateBackend(getStateBackend)
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.clear
    +
    +    val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
    +      .groupBy('a, 'e).select('e).distinct()
    --- End diff --
    
    I think it would be better if the first aggregation would produce updates, such as a `.groupBy('e).select('e, 'a.count()).distinct()`. The count aggregation will produce different count values which have to be retracted and accumulated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4279: [FLINK-7126] [table] Support Distinct for Stream S...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4279#discussion_r126534907
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/AggregationsTest.scala ---
    @@ -39,4 +40,42 @@ class AggregationsTest extends TableTestBase {
     
         streamUtil.tEnv.sql(sqlQuery)
       }
    +
    +  @Test
    +  def testDistinct(): Unit = {
    +    val sql = "SELECT DISTINCT a, b, c FROM MyTable"
    +
    +    val expected =
    +      unaryNode(
    +        "DataStreamGroupAggregate",
    +        streamTableNode(0),
    +        term("groupBy", "a, b, c"),
    +        term("select", "a, b, c")
    +      )
    +    streamUtil.verifySql(sql, expected)
    +  }
    +
    +  @Test
    +  def testDistinctAfterAggregate(): Unit = {
    +    val sql = "SELECT DISTINCT a FROM MyTable GROUP BY a, b, c"
    +
    +    val expected =
    +      unaryNode(
    +        "DataStreamGroupAggregate",
    --- End diff --
    
    Shouldn't this query be optimized to only have a single `DataStreamGroupAggregate`? @wuchong can you check this and open a JIRA if this is the case? Thank you


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4279: [FLINK-7126] [table] Support Distinct for Stream S...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4279#discussion_r126589666
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/AggregationsTest.scala ---
    @@ -39,4 +40,42 @@ class AggregationsTest extends TableTestBase {
     
         streamUtil.tEnv.sql(sqlQuery)
       }
    +
    +  @Test
    +  def testDistinct(): Unit = {
    +    val sql = "SELECT DISTINCT a, b, c FROM MyTable"
    +
    +    val expected =
    +      unaryNode(
    +        "DataStreamGroupAggregate",
    +        streamTableNode(0),
    +        term("groupBy", "a, b, c"),
    +        term("select", "a, b, c")
    +      )
    +    streamUtil.verifySql(sql, expected)
    +  }
    +
    +  @Test
    +  def testDistinctAfterAggregate(): Unit = {
    +    val sql = "SELECT DISTINCT a FROM MyTable GROUP BY a, b, c"
    +
    +    val expected =
    +      unaryNode(
    +        "DataStreamGroupAggregate",
    --- End diff --
    
    Thanks, I think we don't have the optimization right now. So I filed a JIRA, see FLINK-7144


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4279: [FLINK-7126] [table] Support Distinct for Stream SQL and ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/4279
  
    +1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4279: [FLINK-7126] [table] Support Distinct for Stream S...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4279#discussion_r126532548
  
    --- Diff: docs/dev/table/sql.md ---
    @@ -284,7 +284,7 @@ FROM Orders
         <tr>
           <td>
             <strong>Distinct</strong><br>
    -        <span class="label label-primary">Batch</span>
    +        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
    --- End diff --
    
    Please add the `Result Updating` label:
    ```
    <span class="label label-info">Result Updating</span>
    ```
    
    If the input table of a Distinct operation produces updates, Distinct might also produce updates, i.e., deletes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4279: [FLINK-7126] [table] Support Distinct for Stream S...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4279#discussion_r126589762
  
    --- Diff: docs/dev/table/tableApi.md ---
    @@ -373,7 +373,7 @@ Table result = orders
         <tr>
           <td>
             <strong>Distinct</strong><br>
    -        <span class="label label-primary">Batch</span>
    +        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
    --- End diff --
    
    I think, Distinct always produces updates no matter the input table produces updates. Because the distinct is translated into GroupBy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4279: [FLINK-7126] [table] Support Distinct for Stream S...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4279#discussion_r126135016
  
    --- Diff: docs/dev/table/sql.md ---
    @@ -284,7 +284,7 @@ FROM Orders
         <tr>
           <td>
             <strong>Distinct</strong><br>
    -        <span class="label label-primary">Batch</span>
    +        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
    --- End diff --
    
    Update tableAPI.md


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---