You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by walterddr <gi...@git.apache.org> on 2017/08/10 18:00:21 UTC

[GitHub] flink pull request #4521: [FLINK-7357] [table] Created extended rules for Wi...

GitHub user walterddr opened a pull request:

    https://github.com/apache/flink/pull/4521

    [FLINK-7357] [table] Created extended rules for WindowStartEndPropertiesRule on Having clause

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/walterddr/flink FLINK-7357

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4521.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4521
    
----
commit 12e6737e3612de94885cc8c16d341b7e2c607370
Author: Rong Rong <ro...@uber.com>
Date:   2017-08-10T17:46:25Z

    make WindowStartEndPropertiesRule abstract and create extended rules for Having clause

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4521: [FLINK-7357] [table] Created extended rules for Wi...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4521#discussion_r135255639
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala ---
    @@ -836,6 +836,52 @@ class OverWindowITCase extends StreamingWithStateTestBase {
         )
         assertEquals(expected.sorted, StreamITCase.testResults.sorted)
       }
    +
    +  @Test
    +  def testHopStartEndWithHaving(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +    env.setStateBackend(getStateBackend)
    +    StreamITCase.clear
    +    env.setParallelism(1)
    +
    +    val sqlQueryHopStartEndWithHaving = "SELECT \n" +
    +      "  c AS k, \n" +
    +      "  COUNT(a) AS v, \n" +
    +      "  HOP_START(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) AS windowStart, \n" +
    +      "  HOP_END(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) AS windowEnd \n" +
    +      "FROM \n" +
    +      "  T1 \n" +
    +      "GROUP BY \n" +
    +      "  HOP(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE), \n" +
    +      "  c \n" +
    +      "HAVING \n" +
    +      "  SUM(b) > 1 \n"
    +
    +    val data = Seq(
    +      Left(14000005L, (1, 1L, "Hi")),
    +      Left(14000000L, (2, 1L, "Hello")),
    +      Left(14000002L, (3, 1L, "Hello")),
    +      Right(14000010L)
    +    )
    +
    +    val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
    +      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
    +
    +    tEnv.registerTable("T1", t1)
    +
    +    val resultHopStartEndWithHaving = tEnv.sql(sqlQueryHopStartEndWithHaving).toAppendStream[Row]
    +    resultHopStartEndWithHaving.addSink(new StreamITCase.StringSink[Row])
    +
    +    env.execute()
    +
    +    val expected = List(
    +      "Hello,2,1970-01-01 03:53:00.0,1970-01-01 03:54:00.0",
    +      "Hi,1,1970-01-01 03:53:00.0,1970-01-01 03:54:00.0"
    --- End diff --
    
    This result is not correct. If the count is `1` and we filter for sums over `b` larger than `1`, we should not include this result.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4521: [FLINK-7357] [table] Created extended rules for Wi...

Posted by walterddr <gi...@git.apache.org>.
Github user walterddr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4521#discussion_r135321169
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala ---
    @@ -143,4 +143,36 @@ class GroupWindowTest extends TableTestBase {
     
         streamUtil.verifySql(sql, expected)
       }
    +
    +  @Test
    +  def testExpressionOnWindowHavingFunction() = {
    +    val sql =
    +      "SELECT " +
    +        "  COUNT(*), " +
    +        "  HOP_START(rowtime, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE) " +
    +        "FROM MyTable " +
    +        "GROUP BY HOP(rowtime, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE) " +
    +        "HAVING SUM(a) > 0"
    +    val expected =
    +      unaryNode(
    +        "DataStreamCalc",
    +        unaryNode(
    +          "DataStreamGroupWindowAggregate",
    +          unaryNode(
    +            "DataStreamCalc",
    +            streamTableNode(0),
    +            term("select", "rowtime, a")
    +          ),
    +          term("window", SlidingGroupWindow('w$, 'rowtime, 60000.millis, 900000.millis)),
    +          term("select",
    +            "COUNT(*) AS EXPR$0",
    +            "SUM(a) AS $f1",
    +            "start('w$) AS w$start",
    +            "end('w$) AS w$end")
    +        ),
    +        term("select", "EXPR$0", "w$start")
    --- End diff --
    
    Ahh. good catch. Bonehead, bonehead :-(. will fix it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4521: [FLINK-7357] [table] Created extended rules for WindowSta...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/4521
  
    Thanks for the PR @walterddr. 
    
    We also need to replace group aux functions in the expressions of the filter predicate to support something like this:
    
    ```
    HAVING 
      SUM(a) > 0 AND
      QUARTER(HOP_START(ts, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE)) = 1
    ```
    
    But that's just a minor change. 
    I'll fix that, extend the tests to cover the case, and do some refactorings before I merge this PR.
    
    Cheers, Fabian


---

[GitHub] flink issue #4521: [FLINK-7357] [table] Created extended rules for WindowSta...

Posted by walterddr <gi...@git.apache.org>.
Github user walterddr commented on the issue:

    https://github.com/apache/flink/pull/4521
  
    @twalthr looks like this should resolve the last filter issue. Please kindly take a look and see if this looks good to go in :-)


---

[GitHub] flink pull request #4521: [FLINK-7357] [table] Created extended rules for Wi...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4521#discussion_r135256246
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala ---
    @@ -147,7 +147,8 @@ object FlinkRuleSets {
     
         // Transform window to LogicalWindowAggregate
         DataSetLogicalWindowAggregateRule.INSTANCE,
    -    WindowStartEndPropertiesRule.INSTANCE
    +    WindowStartEndPropertiesRule.INSTANCE,
    +    WindowStartEndPropertiesHavingRule.INSTANCE
    --- End diff --
    
    Please also add tests for the batch logical plan.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4521: [FLINK-7357] [table] Created extended rules for Wi...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4521#discussion_r135254243
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala ---
    @@ -143,4 +143,36 @@ class GroupWindowTest extends TableTestBase {
     
         streamUtil.verifySql(sql, expected)
       }
    +
    +  @Test
    +  def testExpressionOnWindowHavingFunction() = {
    +    val sql =
    +      "SELECT " +
    +        "  COUNT(*), " +
    +        "  HOP_START(rowtime, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE) " +
    +        "FROM MyTable " +
    +        "GROUP BY HOP(rowtime, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE) " +
    +        "HAVING SUM(a) > 0"
    +    val expected =
    +      unaryNode(
    +        "DataStreamCalc",
    +        unaryNode(
    +          "DataStreamGroupWindowAggregate",
    +          unaryNode(
    +            "DataStreamCalc",
    +            streamTableNode(0),
    +            term("select", "rowtime, a")
    +          ),
    +          term("window", SlidingGroupWindow('w$, 'rowtime, 60000.millis, 900000.millis)),
    +          term("select",
    +            "COUNT(*) AS EXPR$0",
    +            "SUM(a) AS $f1",
    +            "start('w$) AS w$start",
    +            "end('w$) AS w$end")
    +        ),
    +        term("select", "EXPR$0", "w$start")
    --- End diff --
    
    Isn't your condition `> 0` missing in your logical plan?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4521: [FLINK-7357] [table] Created extended rules for Wi...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4521


---