You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Martijn Visser (Jira)" <ji...@apache.org> on 2022/05/04 11:09:00 UTC
[jira] [Commented] (FLINK-27418) Flink SQL TopN result is wrong
[ https://issues.apache.org/jira/browse/FLINK-27418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17531667#comment-17531667 ]
Martijn Visser commented on FLINK-27418:
----------------------------------------
[~jark] What do you think?
> Flink SQL TopN result is wrong
> ------------------------------
>
> Key: FLINK-27418
> URL: https://issues.apache.org/jira/browse/FLINK-27418
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Affects Versions: 1.12.2, 1.14.3
> Environment: Flink 1.12.2 and Flink 1.14.3 test results are sometimes wrong
> Reporter: zhangbin
> Priority: Major
>
> Flink SQL TopN is executed multiple times with different results, sometimes with correct results and sometimes with incorrect results.
> Example:
> {code:java}
> @Test
> public void flinkSqlJoinRetract() {
> EnvironmentSettings settings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
> StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
> streamEnv.setParallelism(1);
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
> tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(10000));
> RowTypeInfo waybillTableTypeInfo = buildWaybillTableTypeInfo();
> RowTypeInfo itemTableTypeInfo = buildItemTableTypeInfo();
> SourceFunction<Row> waybillSourceFunction = buildWaybillStreamSource(waybillTableTypeInfo);
> SourceFunction<Row> itemSourceFunction = buildItemStreamSource(itemTableTypeInfo);
> String waybillTable = "waybill";
> String itemTable = "item";
> DataStreamSource<Row> waybillStream = streamEnv.addSource(
> waybillSourceFunction,
> waybillTable,
> waybillTableTypeInfo);
> DataStreamSource<Row> itemStream = streamEnv.addSource(
> itemSourceFunction,
> itemTable,
> itemTableTypeInfo);
> Expression[] waybillFields = ExpressionParser
> .parseExpressionList(String.join(",", waybillTableTypeInfo.getFieldNames())
> + ",proctime.proctime").toArray(new Expression[0]);
> Expression[] itemFields = ExpressionParser
> .parseExpressionList(
> String.join(",", itemTableTypeInfo.getFieldNames()) + ",proctime.proctime")
> .toArray(new Expression[0]);
> tableEnv.createTemporaryView(waybillTable, waybillStream, waybillFields);
> tableEnv.createTemporaryView(itemTable, itemStream, itemFields);
> String sql =
> "select \n"
> + " city_id, \n"
> + " count(*) as cnt\n"
> + "from (\n"
> + " select id,city_id\n"
> + " from (\n"
> + " select \n"
> + " id,\n"
> + " city_id,\n"
> + " row_number() over(partition by id order by utime desc ) as rno \n"
> + " from (\n"
> + " select \n"
> + " waybill.id as id,\n"
> + " coalesce(item.city_id, waybill.city_id) as city_id,\n"
> + " waybill.utime as utime \n"
> + " from waybill left join item \n"
> + " on waybill.id = item.id \n"
> + " ) \n"
> + " )\n"
> + " where rno =1\n"
> + ")\n"
> + "group by city_id";
> StatementSet statementSet = tableEnv.createStatementSet();
> Table table = tableEnv.sqlQuery(sql);
> DataStream<Tuple2<Boolean, Row>> rowDataStream = tableEnv.toRetractStream(table, Row.class);
> rowDataStream.printToErr();
> try {
> streamEnv.execute();
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
> private static RowTypeInfo buildWaybillTableTypeInfo() {
> TypeInformation[] types = new TypeInformation[]{Types.INT(), Types.STRING(), Types.LONG(), Types.LONG()};
> String[] fields = new String[]{"id", "city_id", "rider_id", "utime"};
> return new RowTypeInfo(types, fields);
> }
> private static RowTypeInfo buildItemTableTypeInfo() {
> TypeInformation[] types = new TypeInformation[]{Types.INT(), Types.STRING(), Types.LONG()};
> String[] fields = new String[]{"id", "city_id", "utime"};
> return new RowTypeInfo(types, fields);
> }
> //id,rider_id,city_id,utime
> private static SourceFunction<Row> buildWaybillStreamSource(RowTypeInfo rowTypeInfo) {
> return new SourceFunction<Row>() {
> private volatile boolean stopped = false;
> int count = 0;
> int[] ids = {111, 222, 333, 111};
> String[] cityIds = {"A", "A", "B", "A"};
> @Override
> public void run(SourceContext<Row> ctx) throws Exception {
> while (!stopped) {
> int id = ids[count % ids.length];
> String cityId = cityIds[count % cityIds.length];
> Row row = new Row(4);
> row.setField(0, id);
> row.setField(1, cityId);
> row.setField(2, (long) RandomUtils.nextInt(1000, 2000));
> row.setField(3, System.currentTimeMillis());
> printRow(rowTypeInfo, row);
> ctx.collect(row);
> if (++count > 3) {
> stopped = true;
> }
> }
> }
> @Override
> public void cancel() {
> stopped = true;
> }
> };
> }
> //id,city_id,utime
> private static SourceFunction<Row> buildItemStreamSource(RowTypeInfo rowTypeInfo) {
> return new SourceFunction<Row>() {
> private volatile boolean stopped = false;
> int count = 0;
> int[] ids = {111, 333};
> String[] cityIds = {"C", "D"};
> @Override
> public void run(SourceContext<Row> ctx) throws Exception {
> while (!stopped) {
> Thread.sleep(RandomUtils.nextInt(1000, 2000));
> int id = ids[count % ids.length];
> String cityId = cityIds[count % cityIds.length];
> Row row = new Row(3);
> row.setField(0, id);
> row.setField(1, cityId);
> //row.setField(2, System.currentTimeMillis());
> printRow(rowTypeInfo, row);
> ctx.collect(row);
> if (++count >= 2) {
> stopped = true;
> }
> }
> }
> @Override
> public void cancel() {
> stopped = true;
> }
> };
> }
> public static void printRow(RowTypeInfo rowTypeInfo, Row row) {
> String prefix = "";
> for (int i = 0; i < rowTypeInfo.getArity(); ++i) {
> prefix = i > 0 ? "," : "";
> System.out.print(prefix + rowTypeInfo.getFieldNames()[i] + ":" + row.getField(i));
> }
> System.out.println();
> }
> {code}
> ------------------------------------------------------------
> |*wrong result*||right result||
> |id:111,city_id:A,rider_id:1137,utime:1650979957702
> id:222,city_id:A,rider_id:1976,utime:1650979957725
> id:333,city_id:B,rider_id:1916,utime:1650979957725
> id:111,city_id:A,rider_id:1345,utime:1650979957725
> (true,A,1)
> (false,A,1)
> (true,A,2)
> (true,B,1)
> (false,A,2)
> (true,A,1)
> (false,A,1)
> (true,A,2)
> id:111,city_id:C,utime:null
> (false,A,2)
> (true,A,1)
> (true,C,1)
> (false,A,1)
> (false,C,1)
> (true,C,2)
> id:333,city_id: D,utime:null
> (false,B,1)
> (true,D,1)
> The final result:
> C,2
> D,1
> is wrong.|
> id:111,city_id:A,rider_id:1155,utime:1650980662019
> id:222,city_id:A,rider_id:1875,utime:1650980662042
> id:333,city_id:B,rider_id:1430,utime:1650980662042
> id:111,city_id:A,rider_id:1308,utime:1650980662042
> (true,A,1)
> (false,A,1)
> (true,A,2)
> (true,B,1)
> (false,A,2)
> (true,A,1)
> (false,A,1)
> (true,A,2)
> id:111,city_id:C,utime:null
> (false,A,2)
> (true,A,1)
> (false,A,1)
> (true,A,2)
> (false,A,2)
> (true,A,1)
> (true,C,1)
> id:333,city_id: D,utime:null
> (false,B,1)
> (true,D,1)
> The final result:
> A,1
> C,2
> D,1
> is right.|
>
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)