You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jark Wu (Jira)" <ji...@apache.org> on 2021/01/16 11:57:00 UTC

[jira] [Closed] (FLINK-20909) MiniBatch Interval derivation does not work well when enable miniBatch optimization in a job which contains deduplicate on row and unbounded aggregate.

     [ https://issues.apache.org/jira/browse/FLINK-20909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jark Wu closed FLINK-20909.
---------------------------
    Resolution: Fixed

Fixed in master: d1d78e72220e2938bf5af77420af16c477c96e29

> MiniBatch Interval derivation does not work well when enable miniBatch optimization in a job which contains deduplicate on row and unbounded aggregate.
> -------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-20909
>                 URL: https://issues.apache.org/jira/browse/FLINK-20909
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>    Affects Versions: 1.12.0
>            Reporter: Andy
>            Assignee: Andy
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.13.0
>
>
> MiniBatch Interval derivation does not work well when enable miniBatch optimization in a job which contains deduplicate on row and unbounded aggregate.
> {code:java}
> @Test
> def testLastRowOnRowtime1(): Unit = {
>   val t = env.fromCollection(rowtimeTestData)
>     .assignTimestampsAndWatermarks(new RowtimeExtractor)
>     .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime())
>   tEnv.registerTable("T", t)
>   tEnv.executeSql(
>     s"""
>        |CREATE TABLE rowtime_sink (
>        |    cnt BIGINT
>        |) WITH (
>        |  'connector' = 'values',
>        |  'sink-insert-only' = 'false',
>        |  'changelog-mode' = 'I,UA,D'
>        |)
>        |""".stripMargin)
>   val sql =
>     """
>       |INSERT INTO rowtime_sink
>       |SELECT COUNT(b) FROM (
>       | SELECT a, b, c, rowtime
>       | FROM (
>       |   SELECT *,
>       |     ROW_NUMBER() OVER (PARTITION BY b ORDER BY rowtime DESC) as rowNum
>       |   FROM T
>       | )
>       | WHERE rowNum = 1
>       | )
>     """.stripMargin
>   tEnv.executeSql(sql).await()
>   val rawResult = TestValuesTableFactory.getRawResults("rowtime_sink")
> }{code}
> E.g for the above sql, when enable MiniBatch optimization, the optimized plan is as following.
> {code:java}
> Sink(table=[default_catalog.default_database.rowtime_sink], fields=[EXPR$0])
> +- GlobalGroupAggregate(select=[COUNT_RETRACT(count$0) AS EXPR$0])
>    +- Exchange(distribution=[single])
>       +- LocalGroupAggregate(select=[COUNT_RETRACT(b) AS count$0, COUNT_RETRACT(*) AS count1$1])
>          +- Calc(select=[b])
>             +- Deduplicate(keep=[LastRow], key=[b], order=[ROWTIME])
>                +- Exchange(distribution=[hash[b]])
>                   +- Calc(select=[b, rowtime])
>                      +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
>                         +- DataStreamScan(table=[[default_catalog, default_database, T]], fields=[a, b, c, rowtime]){code}
> A `StreamExecMiniBatchAssigner` will be inserted. The behavior is weird because `Deduplicate` depends on rowTime, however `ProcTimeMiniBatchAssignerOperator` will send watermark every specified interval second depends on process time. For `Deduplicate`, the incoming watermark does not relate to rowTime of incoming record, it cannot indicate rowTime of all following input records are all larger than or equals to the current incoming watermark.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)