You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Andy (Jira)" <ji...@apache.org> on 2021/01/10 04:03:00 UTC

[jira] [Updated] (FLINK-20909) MiniBatch Interval derivation does not work well when enable miniBatch optimization in a job which contains deduplicate on row and unbounded aggregate.

     [ https://issues.apache.org/jira/browse/FLINK-20909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Andy updated FLINK-20909:
-------------------------
    Description: 
MiniBatch Interval derivation does not work well when enable miniBatch optimization in a job which contains deduplicate on row and unbounded aggregate.
{code:java}
@Test
def testLastRowOnRowtime1(): Unit = {
  val t = env.fromCollection(rowtimeTestData)
    .assignTimestampsAndWatermarks(new RowtimeExtractor)
    .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime())
  tEnv.registerTable("T", t)
  tEnv.executeSql(
    s"""
       |CREATE TABLE rowtime_sink (
       |    cnt BIGINT
       |) WITH (
       |  'connector' = 'values',
       |  'sink-insert-only' = 'false',
       |  'changelog-mode' = 'I,UA,D'
       |)
       |""".stripMargin)
  val sql =
    """
      |INSERT INTO rowtime_sink
      |SELECT COUNT(b) FROM (
      | SELECT a, b, c, rowtime
      | FROM (
      |   SELECT *,
      |     ROW_NUMBER() OVER (PARTITION BY b ORDER BY rowtime DESC) as rowNum
      |   FROM T
      | )
      | WHERE rowNum = 1
      | )
    """.stripMargin

  tEnv.executeSql(sql).await()
  val rawResult = TestValuesTableFactory.getRawResults("rowtime_sink")
}{code}
E.g for the above sql, when enable MiniBatch optimization, the optimized plan is as following.
{code:java}
Sink(table=[default_catalog.default_database.rowtime_sink], fields=[EXPR$0])
+- GlobalGroupAggregate(select=[COUNT_RETRACT(count$0) AS EXPR$0])
   +- Exchange(distribution=[single])
      +- LocalGroupAggregate(select=[COUNT_RETRACT(b) AS count$0, COUNT_RETRACT(*) AS count1$1])
         +- Calc(select=[b])
            +- Deduplicate(keep=[LastRow], key=[b], order=[ROWTIME])
               +- Exchange(distribution=[hash[b]])
                  +- Calc(select=[b, rowtime])
                     +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
                        +- DataStreamScan(table=[[default_catalog, default_database, T]], fields=[a, b, c, rowtime]){code}
A `StreamExecMiniBatchAssigner` will be inserted. The behavior is weird because `Deduplicate` depends on rowTime, however `ProcTimeMiniBatchAssignerOperator` will send watermark every each 5 second depends on process time. For `Deduplicate`, the incoming watermark does not relate to rowTime of incoming record, it cannot indicate rowTime of all following input records are all larger than or equals to the current incoming watermark.

 

  was:
MiniBatch Interval derivation does not work well when enable miniBatch optimization in a job which contains deduplicate on row and unbounded aggregate.
{code:java}
@Test
def testLastRowOnRowtime1(): Unit = {
  val t = env.fromCollection(rowtimeTestData)
    .assignTimestampsAndWatermarks(new RowtimeExtractor)
    .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime())
  tEnv.registerTable("T", t)
  tEnv.executeSql(
    s"""
       |CREATE TABLE rowtime_sink (
       |    cnt BIGINT
       |) WITH (
       |  'connector' = 'values',
       |  'sink-insert-only' = 'false',
       |  'changelog-mode' = 'I,UA,D'
       |)
       |""".stripMargin)
  val sql =
    """
      |INSERT INTO rowtime_sink
      |SELECT COUNT(b) FROM (
      | SELECT a, b, c, rowtime
      | FROM (
      |   SELECT *,
      |     ROW_NUMBER() OVER (PARTITION BY b ORDER BY rowtime DESC) as rowNum
      |   FROM T
      | )
      | WHERE rowNum = 1
      | )
    """.stripMargin

  tEnv.executeSql(sql).await()
  val rawResult = TestValuesTableFactory.getRawResults("rowtime_sink")
}{code}
E.g for the above sql, when enable MiniBatch optimization, the optimized plan is as following.
{code:java}
Sink(table=[default_catalog.default_database.rowtime_sink], fields=[EXPR$0])
+- GlobalGroupAggregate(select=[COUNT_RETRACT(count$0) AS EXPR$0])
   +- Exchange(distribution=[single])
      +- LocalGroupAggregate(select=[COUNT_RETRACT(b) AS count$0, COUNT_RETRACT(*) AS count1$1])
         +- Calc(select=[b])
            +- Deduplicate(keep=[LastRow], key=[b], order=[ROWTIME])
               +- Exchange(distribution=[hash[b]])
                  +- Calc(select=[b, rowtime])
                     +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
                        +- DataStreamScan(table=[[default_catalog, default_database, T]], fields=[a, b, c, rowtime]){code}
A MiniBatchAssigner will be inserted. The behavior is weird because Deduplicate depends on rowTime, however `ProcTimeMiniBatchAssignerOperator` will send watermark every each 5 second depends on process time. For Deduplicate, the incoming watermark does not has any relationship with rowTime of incoming record, it cannot indicate rowTime of all following input records are all larger than or equals to the current incoming watermark.

 


> MiniBatch Interval derivation does not work well when enable miniBatch optimization in a job which contains deduplicate on row and unbounded aggregate.
> -------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-20909
>                 URL: https://issues.apache.org/jira/browse/FLINK-20909
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>            Reporter: Andy
>            Priority: Minor
>
> MiniBatch Interval derivation does not work well when enable miniBatch optimization in a job which contains deduplicate on row and unbounded aggregate.
> {code:java}
> @Test
> def testLastRowOnRowtime1(): Unit = {
>   val t = env.fromCollection(rowtimeTestData)
>     .assignTimestampsAndWatermarks(new RowtimeExtractor)
>     .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime())
>   tEnv.registerTable("T", t)
>   tEnv.executeSql(
>     s"""
>        |CREATE TABLE rowtime_sink (
>        |    cnt BIGINT
>        |) WITH (
>        |  'connector' = 'values',
>        |  'sink-insert-only' = 'false',
>        |  'changelog-mode' = 'I,UA,D'
>        |)
>        |""".stripMargin)
>   val sql =
>     """
>       |INSERT INTO rowtime_sink
>       |SELECT COUNT(b) FROM (
>       | SELECT a, b, c, rowtime
>       | FROM (
>       |   SELECT *,
>       |     ROW_NUMBER() OVER (PARTITION BY b ORDER BY rowtime DESC) as rowNum
>       |   FROM T
>       | )
>       | WHERE rowNum = 1
>       | )
>     """.stripMargin
>   tEnv.executeSql(sql).await()
>   val rawResult = TestValuesTableFactory.getRawResults("rowtime_sink")
> }{code}
> E.g for the above sql, when enable MiniBatch optimization, the optimized plan is as following.
> {code:java}
> Sink(table=[default_catalog.default_database.rowtime_sink], fields=[EXPR$0])
> +- GlobalGroupAggregate(select=[COUNT_RETRACT(count$0) AS EXPR$0])
>    +- Exchange(distribution=[single])
>       +- LocalGroupAggregate(select=[COUNT_RETRACT(b) AS count$0, COUNT_RETRACT(*) AS count1$1])
>          +- Calc(select=[b])
>             +- Deduplicate(keep=[LastRow], key=[b], order=[ROWTIME])
>                +- Exchange(distribution=[hash[b]])
>                   +- Calc(select=[b, rowtime])
>                      +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
>                         +- DataStreamScan(table=[[default_catalog, default_database, T]], fields=[a, b, c, rowtime]){code}
> A `StreamExecMiniBatchAssigner` will be inserted. The behavior is weird because `Deduplicate` depends on rowTime, however `ProcTimeMiniBatchAssignerOperator` will send watermark every each 5 second depends on process time. For `Deduplicate`, the incoming watermark does not relate to rowTime of incoming record, it cannot indicate rowTime of all following input records are all larger than or equals to the current incoming watermark.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)