You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Andy (Jira)" <ji...@apache.org> on 2021/01/10 04:02:01 UTC

[jira] [Created] (FLINK-20909) MiniBatch Interval derivation does not work well when enable miniBatch optimization in a job which contains deduplicate on row and unbounded aggregate.

Andy created FLINK-20909:
----------------------------

             Summary: MiniBatch Interval derivation does not work well when enable miniBatch optimization in a job which contains deduplicate on row and unbounded aggregate.
                 Key: FLINK-20909
                 URL: https://issues.apache.org/jira/browse/FLINK-20909
             Project: Flink
          Issue Type: Improvement
          Components: Table SQL / Planner
            Reporter: Andy


MiniBatch Interval derivation does not work well when enable miniBatch optimization in a job which contains deduplicate on row and unbounded aggregate.
{code:java}
@Test
def testLastRowOnRowtime1(): Unit = {
  val t = env.fromCollection(rowtimeTestData)
    .assignTimestampsAndWatermarks(new RowtimeExtractor)
    .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime())
  tEnv.registerTable("T", t)
  tEnv.executeSql(
    s"""
       |CREATE TABLE rowtime_sink (
       |    cnt BIGINT
       |) WITH (
       |  'connector' = 'values',
       |  'sink-insert-only' = 'false',
       |  'changelog-mode' = 'I,UA,D'
       |)
       |""".stripMargin)
  val sql =
    """
      |INSERT INTO rowtime_sink
      |SELECT COUNT(b) FROM (
      | SELECT a, b, c, rowtime
      | FROM (
      |   SELECT *,
      |     ROW_NUMBER() OVER (PARTITION BY b ORDER BY rowtime DESC) as rowNum
      |   FROM T
      | )
      | WHERE rowNum = 1
      | )
    """.stripMargin

  tEnv.executeSql(sql).await()
  val rawResult = TestValuesTableFactory.getRawResults("rowtime_sink")
}{code}
E.g for the above sql, when enable MiniBatch optimization, the optimized plan is as following.
{code:java}
Sink(table=[default_catalog.default_database.rowtime_sink], fields=[EXPR$0])
+- GlobalGroupAggregate(select=[COUNT_RETRACT(count$0) AS EXPR$0])
   +- Exchange(distribution=[single])
      +- LocalGroupAggregate(select=[COUNT_RETRACT(b) AS count$0, COUNT_RETRACT(*) AS count1$1])
         +- Calc(select=[b])
            +- Deduplicate(keep=[LastRow], key=[b], order=[ROWTIME])
               +- Exchange(distribution=[hash[b]])
                  +- Calc(select=[b, rowtime])
                     +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
                        +- DataStreamScan(table=[[default_catalog, default_database, T]], fields=[a, b, c, rowtime]){code}
A MiniBatchAssigner will be inserted. The behavior is weird because Deduplicate depends on rowTime, however `ProcTimeMiniBatchAssignerOperator` will send watermark every each 5 second depends on process time. For Deduplicate, the incoming watermark does not has any relationship with rowTime of incoming record, it cannot indicate rowTime of all following input records are all larger than or equals to the current incoming watermark.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)