You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "nyingping (Jira)" <ji...@apache.org> on 2022/07/12 07:23:00 UTC

[jira] [Created] (FLINK-28504) Local-Global aggregation causes watermark alignment (table.exec.source.idle-timeout) of idle partition invalid

nyingping created FLINK-28504:
---------------------------------

             Summary: Local-Global aggregation causes watermark alignment (table.exec.source.idle-timeout) of idle partition invalid
                 Key: FLINK-28504
                 URL: https://issues.apache.org/jira/browse/FLINK-28504
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Runtime
    Affects Versions: 1.14.2
         Environment: flink 1.14.1

kafka 2.4
            Reporter: nyingping
         Attachments: image-2022-07-12-15-11-51-653.png, image-2022-07-12-15-19-29-950.png, image-2022-07-12-15-20-06-919.png

I have a window topN test task, the code is as follows

```

 Configuration configuration = new Configuration();
        configuration.setInteger(RestOptions.PORT, 8082);
        StreamExecutionEnvironment streamExecutionEnvironment =
                StreamExecutionEnvironment.getExecutionEnvironment(configuration);
 
        StreamTableEnvironment st = StreamTableEnvironment.create(streamExecutionEnvironment);
 
        st.getConfig().getConfiguration().setString("table.exec.source.idle-timeout", "10s");

        st.executeSql(
                "CREATE TABLE test (\n"
                        + "  `key` STRING,\n"
                        + "  `time` TIMESTAMP(3),\n"
                        + "  `price` float,\n"
                        + "  WATERMARK FOR `time` AS `time` - INTERVAL '10' SECOND"
                        + ") WITH (\n"
                        + "  'connector' = 'kafka',\n"
                        + "  'topic' = 'test',\n"
                        + "  'properties.bootstrap.servers' = 'testlocal:9092',\n"
                        + "  'properties.group.id' = 'windowGroup',\n"
                        + "  'scan.startup.mode' = 'latest-offset',\n"
                        + "  'format' = 'json'\n"
                        + ")"

    String sqlWindowTopN =
                "select * from (" +
                "  select *, " +
                "   ROW_NUMBER() over (partition by window_start, window_end order by total desc ) as rownum " +
                "     from (" +
                "       select key,window_start,window_end,count(key) as `count`,sum(price) total from table (" +
                "           tumble(TABLE test, DESCRIPTOR(`time`), interval '1' minute)" +
                "        ) group by window_start, window_end, key" +
                "   )" +
                ") where rownum <= 3";
    st.executeSql(sqlWindowTopN).print();

```

 

Run and do not get result on long time after.

Watermark appears as follows on the UI

 

!image-2022-07-12-15-11-51-653.png!

I didn't set the parallelism manually, so it defaults to 12. The data source Kafka has only one partition, so there are free partitions. To align the watermarks for the entire task, I use the `table.exec. source. Idle-timeout` configuration.

 

As above show,I found that the system automatically split window-Topn SQL into local-global aggregation tasks. In the Local phase, watermark didn't work as well as I expected.

 

Manually setting the parallelism to 1 did what I expected.

`streamExecutionEnvironment.setParallelism(1);`

!image-2022-07-12-15-19-29-950.png!

 

I can also manually configure the system not to split into local-global phases. At this point, the `table.exec.source-idle-timeout ` configuration takes effect and the watermark is aligned.

`

st.getConfig().getConfiguration().setString("table.optimizer.agg-phase-strategy", "ONE_PHASE");

`

result:

!image-2022-07-12-15-20-06-919.png!

 

As mentioned above, I hope to be able to aggregate in two stages and align the watermarks at the same time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)