You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yun Gao (Jira)" <ji...@apache.org> on 2022/04/13 06:28:05 UTC

[jira] [Updated] (FLINK-23379) interval left join null value result out of order

     [ https://issues.apache.org/jira/browse/FLINK-23379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yun Gao updated FLINK-23379:
----------------------------
    Fix Version/s: 1.16.0

>  interval left join null value result out of order
> --------------------------------------------------
>
>                 Key: FLINK-23379
>                 URL: https://issues.apache.org/jira/browse/FLINK-23379
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.12.2
>            Reporter: waywtdcc
>            Priority: Major
>             Fix For: 1.15.0, 1.16.0
>
>         Attachments: image-2021-07-15-16-53-59-228.png
>
>
> * Scenes:
>  Person main table left interval join associated message information table,
>  The first record that is not associated with the message information table will be later than the later record that is associated with the message information table.
>  When there are normal output and null value output with the same primary key, it will be out of order, and the null value output is later than the normal value output, resulting in incorrect results
> enter:
> {"id": 1, "name":"chencc2", "message": "good boy2", "ts":"2021-03-26 18:56:43"}
> {"id": 1, "name":"chencc2", "age": "28", "ts":"2021-03-26 19:02:47"}
> {"id": 1, "name":"chencc2", "message": "good boy3", "ts":"2021-03-26 19:06:43"}
> {"id": 1, "name":"chencc2", "age": "27", "ts":"2021-03-26 19:06:47"}
> Output:
>  +I(chencc2,27,2021-03-26T19:06:47,good boy3,2021-03-26 19:06:43.000)
>  +I(chencc2,28,2021-03-26T19:02:47,null,null)
>  The time of the second record here is 19:02 earlier than the first record, but the output of the result is late, causing data update errors
>  
>  *  code
> {code:java}
> tableEnv.executeSql("drop table if exists persons_table_kafka2");
>          String kafka_source_sql = "CREATE TABLE persons_table_kafka2 (\n" +
>                  "  `id` BIGINT,\n" +
>                  "  `name` STRING,\n" +
>                  "  `age` INT,\n" +
>                  "  proctime as PROCTIME(),\n" +
>                  "  `ts` TIMESTAMP(3),\n" +
>                  " WATERMARK FOR ts AS ts\n" +
>                  ") WITH (\n" +
>                  "  'connector' = 'kafka',\n" +
>                  "  'topic' = 'persons_test2',\n" +
>                  "  'properties.bootstrap.servers' = 'node2:6667',\n" +
>                  "  'properties.group.id' = 'testGroa115',\n" +
>                  "  'scan.startup.mode' = 'group-offsets',\n" +
>                  "  'format' = 'json'\n" +
>                  ")";
>          tableEnv.executeSql(kafka_source_sql);
>         tableEnv.executeSql("drop table if exists persons_message_table_kafka2");
>          String kafka_source_sql2 = "CREATE TABLE persons_message_table_kafka2 (\n" +
>                  "  `id` BIGINT,\n" +
>                  "  `name` STRING,\n" +
>                  "  `message` STRING,\n" +
>                  "  `ts` TIMESTAMP(3) ," +
>  //                " WATERMARK FOR ts AS ts - INTERVAL '5' SECOND\n" +
>                  " WATERMARK FOR ts AS ts\n" +
>                  ") WITH (\n" +
>                  "  'connector' = 'kafka',\n" +
>                  "  'topic' = 'persons_extra_message2',\n" +
>                  "  'properties.bootstrap.servers' = 'node2:6667',\n" +
>                  "  'properties.group.id' = 'testGroud2e313',\n" +
>                  "  'scan.startup.mode' = 'group-offsets',\n" +
>                  "  'format' = 'json'\n" +
>                  ")";
>          tableEnv.executeSql(kafka_source_sql2);
>         tableEnv.executeSql("" +
>                  "CREATE TEMPORARY VIEW result_data_view " +
>                  " as " +
>                  " select " +
>                  " t1.name, t1.age,t1.ts as ts1,t2.message, cast (t2.ts as string) as ts2 " +
>                  " from persons_table_kafka2 t1 " +
>                  " left  join persons_message_table_kafka2 t2 on t1.name = t2.name and t1.ts between " +
>                          " t2.ts and t2.ts +  INTERVAL '3' MINUTE"
>                  );
>         Table resultTable = tableEnv.from("result_data_view");
>          DataStream<RowData> rowDataDataStream = tableEnv.toAppendStream(resultTable, RowData.class);
>          rowDataDataStream.print();
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)