You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "zhangbin (Jira)" <ji...@apache.org> on 2022/04/26 14:12:00 UTC

[jira] [Updated] (FLINK-27418) Flink SQL TopN result is wrong

     [ https://issues.apache.org/jira/browse/FLINK-27418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

zhangbin updated FLINK-27418:
-----------------------------
    Description: 
Flink SQL TopN is executed multiple times with different results, sometimes with correct results and sometimes with incorrect results.

Example:

```

@Test
public void testFlinkSqlJoinRetract() {
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();

StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(10000));

RowTypeInfo waybillTableTypeInfo = buildWaybillTableTypeInfo();
RowTypeInfo itemTableTypeInfo = buildItemTableTypeInfo();
SourceFunction<Row> waybillSourceFunction = buildWaybillStreamSource(waybillTableTypeInfo);
SourceFunction<Row> itemSourceFunction = buildItemStreamSource(itemTableTypeInfo);
String waybillTable = "waybill";
String itemTable = "item";

DataStreamSource<Row> waybillStream = streamEnv.addSource(
waybillSourceFunction,
waybillTable,
waybillTableTypeInfo);
DataStreamSource<Row> itemStream = streamEnv.addSource(
itemSourceFunction,
itemTable,
itemTableTypeInfo);

Expression[] waybillFields = ExpressionParser
.parseExpressionList(String.join(",", waybillTableTypeInfo.getFieldNames())
+ ",proctime.proctime").toArray(new Expression[0]);
Expression[] itemFields = ExpressionParser
.parseExpressionList(
String.join(",", itemTableTypeInfo.getFieldNames()) + ",proctime.proctime")
.toArray(new Expression[0]);

tableEnv.createTemporaryView(waybillTable, waybillStream, waybillFields);
tableEnv.createTemporaryView(itemTable, itemStream, itemFields);

String sql =
"select \n"
+ " city_id, \n"
+ " count(*) as cnt\n"
+ "from (\n"
+ " select id,city_id\n"
+ " from (\n"
+ " select \n"
+ " id,\n"
+ " city_id,\n"
+ " row_number() over(partition by id order by utime desc ) as rno \n"
+ " from (\n"
+ " select \n"
+ " waybill.id as id,\n"
+ " coalesce(item.city_id, waybill.city_id) as city_id,\n"
+ " waybill.utime as utime \n"
+ " from waybill left join item \n"
+ " on waybill.id = item.id \n"
+ " ) \n"
+ " )\n"
+ " where rno =1\n"
+ ")\n"
+ "group by city_id";

StatementSet statementSet = tableEnv.createStatementSet();
Table table = tableEnv.sqlQuery(sql);
DataStream<Tuple2<Boolean, Row>> rowDataStream = tableEnv.toRetractStream(table, Row.class);
rowDataStream.printToErr();
try

{ streamEnv.execute(); }

catch (Exception e)

{ e.printStackTrace(); }

}

private static RowTypeInfo buildWaybillTableTypeInfo()

{ TypeInformation[] types = new TypeInformation[]

{Types.INT(), Types.STRING(), Types.LONG(), Types.LONG()}

;
String[] fields = new String[]\{"id", "city_id", "rider_id", "utime"};
return new RowTypeInfo(types, fields);
}

private static RowTypeInfo buildItemTableTypeInfo()

{ TypeInformation[] types = new TypeInformation[]

{Types.INT(), Types.STRING(), Types.LONG()}

;
String[] fields = new String[]\{"id", "city_id", "utime"};
return new RowTypeInfo(types, fields);
}

//id,rider_id,city_id,utime
private static SourceFunction<Row> buildWaybillStreamSource(RowTypeInfo rowTypeInfo) {
return new SourceFunction<Row>()

{ private volatile boolean stopped = false; int count = 0; int[] ids =

{111, 222, 333, 111}

;
String[] cityIds = \{"A", "A", "B", "A"};

@Override
public void run(SourceContext<Row> ctx) throws Exception {
while (!stopped) {
int id = ids[count % ids.length];
String cityId = cityIds[count % cityIds.length];
Row row = new Row(4);
row.setField(0, id);
row.setField(1, cityId);
row.setField(2, (long) RandomUtils.nextInt(1000, 2000));
row.setField(3, System.currentTimeMillis());
printRow(rowTypeInfo, row);
ctx.collect(row);
if (++count > 3)

{ stopped = true; }
}
}

@Override
public void cancel() \{ stopped = true; }

};
}

//id,city_id,utime
private static SourceFunction<Row> buildItemStreamSource(RowTypeInfo rowTypeInfo) {
return new SourceFunction<Row>()

{ private volatile boolean stopped = false; int count = 0; int[] ids =

{111, 333}

;
String[] cityIds = \{"C", "D"};

@Override
public void run(SourceContext<Row> ctx) throws Exception {
while (!stopped) {
Thread.sleep(RandomUtils.nextInt(1000, 2000));
int id = ids[count % ids.length];
String cityId = cityIds[count % cityIds.length];
Row row = new Row(3);
row.setField(0, id);
row.setField(1, cityId);
//row.setField(2, System.currentTimeMillis());
printRow(rowTypeInfo, row);
ctx.collect(row);
if (++count >= 2)

{ stopped = true; }

}

}

@Override
public void cancel()

{ stopped = true; }

};
}

public static void printRow(RowTypeInfo rowTypeInfo, Row row) {
String prefix = "";
for (int i = 0; i < rowTypeInfo.getArity(); ++i)

{ prefix = i > 0 ? "," : ""; System.out.print(prefix + rowTypeInfo.getFieldNames()[i] + ":" + row.getField(i)); }

System.out.println();
}

------------------------------------------------------------

Wrong Result:

id:111,city_id:A,rider_id:1137,utime:1650979957702
id:222,city_id:A,rider_id:1976,utime:1650979957725
id:333,city_id:B,rider_id:1916,utime:1650979957725
id:111,city_id:A,rider_id:1345,utime:1650979957725

(true,A,1)
(false,A,1)
(true,A,2)
(true,B,1)
(false,A,2)
(true,A,1)
(false,A,1)
(true,A,2)
id:111,city_id:C,utime:null

(false,A,2)
(true,A,1)
(true,C,1)
(false,A,1)
(false,C,1)
(true,C,2)

id:333,city_id:D,utime:null
(false,B,1)
(true,D,1)

The final result:
C,2
D,1
is wrong.
------------------------------------------

Right result:

------------------------------------------

id:111,city_id:A,rider_id:1155,utime:1650980662019
id:222,city_id:A,rider_id:1875,utime:1650980662042
id:333,city_id:B,rider_id:1430,utime:1650980662042
id:111,city_id:A,rider_id:1308,utime:1650980662042

(true,A,1)
(false,A,1)
(true,A,2)
(true,B,1)
(false,A,2)
(true,A,1)
(false,A,1)
(true,A,2)

id:111,city_id:C,utime:null
(false,A,2)
(true,A,1)
(false,A,1)
(true,A,2)
(false,A,2)
(true,A,1)
(true,C,1)

id:333,city_id:D,utime:null
(false,B,1)
(true,D,1)

The final result:
A,1
C,2
D,1
is right.

```

 

  was:
Flink SQL TopN is executed multiple times with different results, sometimes with correct results and sometimes with incorrect results.

Example:

@Test
public void testFlinkSqlJoinRetract() {
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();

StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(10000));

RowTypeInfo waybillTableTypeInfo = buildWaybillTableTypeInfo();
RowTypeInfo itemTableTypeInfo = buildItemTableTypeInfo();
SourceFunction<Row> waybillSourceFunction = buildWaybillStreamSource(waybillTableTypeInfo);
SourceFunction<Row> itemSourceFunction = buildItemStreamSource(itemTableTypeInfo);
String waybillTable = "waybill";
String itemTable = "item";

DataStreamSource<Row> waybillStream = streamEnv.addSource(
waybillSourceFunction,
waybillTable,
waybillTableTypeInfo);
DataStreamSource<Row> itemStream = streamEnv.addSource(
itemSourceFunction,
itemTable,
itemTableTypeInfo);

Expression[] waybillFields = ExpressionParser
.parseExpressionList(String.join(",", waybillTableTypeInfo.getFieldNames())
+ ",proctime.proctime").toArray(new Expression[0]);
Expression[] itemFields = ExpressionParser
.parseExpressionList(
String.join(",", itemTableTypeInfo.getFieldNames()) + ",proctime.proctime")
.toArray(new Expression[0]);

tableEnv.createTemporaryView(waybillTable, waybillStream, waybillFields);
tableEnv.createTemporaryView(itemTable, itemStream, itemFields);

String sql =
"select \n"
+ " city_id, \n"
+ " count(*) as cnt\n"
+ "from (\n"
+ " select id,city_id\n"
+ " from (\n"
+ " select \n"
+ " id,\n"
+ " city_id,\n"
+ " row_number() over(partition by id order by utime desc ) as rno \n"
+ " from (\n"
+ " select \n"
+ " waybill.id as id,\n"
+ " coalesce(item.city_id, waybill.city_id) as city_id,\n"
+ " waybill.utime as utime \n"
+ " from waybill left join item \n"
+ " on waybill.id = item.id \n"
+ " ) \n"
+ " )\n"
+ " where rno =1\n"
+ ")\n"
+ "group by city_id";

StatementSet statementSet = tableEnv.createStatementSet();
Table table = tableEnv.sqlQuery(sql);
DataStream<Tuple2<Boolean, Row>> rowDataStream = tableEnv.toRetractStream(table, Row.class);
rowDataStream.printToErr();
try {
streamEnv.execute();
} catch (Exception e) {
e.printStackTrace();
}
}

private static RowTypeInfo buildWaybillTableTypeInfo()

{ TypeInformation[] types = new TypeInformation[]\\{Types.INT(), Types.STRING(), Types.LONG(), Types.LONG()}

;
String[] fields = new String[]\{"id", "city_id", "rider_id", "utime"};
return new RowTypeInfo(types, fields);
}

private static RowTypeInfo buildItemTableTypeInfo()

{ TypeInformation[] types = new TypeInformation[]\\{Types.INT(), Types.STRING(), Types.LONG()}

;
String[] fields = new String[]\{"id", "city_id", "utime"};
return new RowTypeInfo(types, fields);
}

//id,rider_id,city_id,utime
private static SourceFunction<Row> buildWaybillStreamSource(RowTypeInfo rowTypeInfo) {
return new SourceFunction<Row>()

{ private volatile boolean stopped = false; int count = 0; int[] ids = \\{111, 222, 333, 111}

;
String[] cityIds = \{"A", "A", "B", "A"};

@Override
public void run(SourceContext<Row> ctx) throws Exception {
while (!stopped) {
int id = ids[count % ids.length];
String cityId = cityIds[count % cityIds.length];
Row row = new Row(4);
row.setField(0, id);
row.setField(1, cityId);
row.setField(2, (long) RandomUtils.nextInt(1000, 2000));
row.setField(3, System.currentTimeMillis());
printRow(rowTypeInfo, row);
ctx.collect(row);
if (++count > 3) {
stopped = true;
}
}
}

@Override
public void cancel() {
stopped = true;
}
};
}

//id,city_id,utime
private static SourceFunction<Row> buildItemStreamSource(RowTypeInfo rowTypeInfo) {
return new SourceFunction<Row>()

{ private volatile boolean stopped = false; int count = 0; int[] ids = \\{111, 333}

;
String[] cityIds = \{"C", "D"};

@Override
public void run(SourceContext<Row> ctx) throws Exception {
while (!stopped) {
Thread.sleep(RandomUtils.nextInt(1000, 2000));
int id = ids[count % ids.length];
String cityId = cityIds[count % cityIds.length];
Row row = new Row(3);
row.setField(0, id);
row.setField(1, cityId);
//row.setField(2, System.currentTimeMillis());
printRow(rowTypeInfo, row);
ctx.collect(row);
if (++count >= 2)

{ stopped = true; } }

}

@Override
public void cancel() {
stopped = true;
}
};
}

public static void printRow(RowTypeInfo rowTypeInfo, Row row) {
String prefix = "";
for (int i = 0; i < rowTypeInfo.getArity(); ++i) {
prefix = i > 0 ? "," : "";
System.out.print(prefix + rowTypeInfo.getFieldNames()[i] + ":" + row.getField(i));
}
System.out.println();
}

------------------------------------------------------------

Wrong Result:

id:111,city_id:A,rider_id:1137,utime:1650979957702
id:222,city_id:A,rider_id:1976,utime:1650979957725
id:333,city_id:B,rider_id:1916,utime:1650979957725
id:111,city_id:A,rider_id:1345,utime:1650979957725

(true,A,1)
(false,A,1)
(true,A,2)
(true,B,1)
(false,A,2)
(true,A,1)
(false,A,1)
(true,A,2)
id:111,city_id:C,utime:null

(false,A,2)
(true,A,1)
(true,C,1)
(false,A,1)
(false,C,1)
(true,C,2)

id:333,city_id:D,utime:null
(false,B,1)
(true,D,1)

The final result:
C,2
D,1
is wrong.
------------------------------------------

Right result:

------------------------------------------

id:111,city_id:A,rider_id:1155,utime:1650980662019
id:222,city_id:A,rider_id:1875,utime:1650980662042
id:333,city_id:B,rider_id:1430,utime:1650980662042
id:111,city_id:A,rider_id:1308,utime:1650980662042

(true,A,1)
(false,A,1)
(true,A,2)
(true,B,1)
(false,A,2)
(true,A,1)
(false,A,1)
(true,A,2)

id:111,city_id:C,utime:null
(false,A,2)
(true,A,1)
(false,A,1)
(true,A,2)
(false,A,2)
(true,A,1)
(true,C,1)

id:333,city_id:D,utime:null
(false,B,1)
(true,D,1)

The final result:
A,1
C,2
D,1
is right.


> Flink SQL TopN result is wrong
> ------------------------------
>
>                 Key: FLINK-27418
>                 URL: https://issues.apache.org/jira/browse/FLINK-27418
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.12.2, 1.14.3
>         Environment: Flink 1.12.2 and Flink 1.14.3 test results are sometimes wrong
>            Reporter: zhangbin
>            Priority: Major
>
> Flink SQL TopN is executed multiple times with different results, sometimes with correct results and sometimes with incorrect results.
> Example:
> ```
> @Test
> public void testFlinkSqlJoinRetract() {
> EnvironmentSettings settings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
> StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
> streamEnv.setParallelism(1);
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
> tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(10000));
> RowTypeInfo waybillTableTypeInfo = buildWaybillTableTypeInfo();
> RowTypeInfo itemTableTypeInfo = buildItemTableTypeInfo();
> SourceFunction<Row> waybillSourceFunction = buildWaybillStreamSource(waybillTableTypeInfo);
> SourceFunction<Row> itemSourceFunction = buildItemStreamSource(itemTableTypeInfo);
> String waybillTable = "waybill";
> String itemTable = "item";
> DataStreamSource<Row> waybillStream = streamEnv.addSource(
> waybillSourceFunction,
> waybillTable,
> waybillTableTypeInfo);
> DataStreamSource<Row> itemStream = streamEnv.addSource(
> itemSourceFunction,
> itemTable,
> itemTableTypeInfo);
> Expression[] waybillFields = ExpressionParser
> .parseExpressionList(String.join(",", waybillTableTypeInfo.getFieldNames())
> + ",proctime.proctime").toArray(new Expression[0]);
> Expression[] itemFields = ExpressionParser
> .parseExpressionList(
> String.join(",", itemTableTypeInfo.getFieldNames()) + ",proctime.proctime")
> .toArray(new Expression[0]);
> tableEnv.createTemporaryView(waybillTable, waybillStream, waybillFields);
> tableEnv.createTemporaryView(itemTable, itemStream, itemFields);
> String sql =
> "select \n"
> + " city_id, \n"
> + " count(*) as cnt\n"
> + "from (\n"
> + " select id,city_id\n"
> + " from (\n"
> + " select \n"
> + " id,\n"
> + " city_id,\n"
> + " row_number() over(partition by id order by utime desc ) as rno \n"
> + " from (\n"
> + " select \n"
> + " waybill.id as id,\n"
> + " coalesce(item.city_id, waybill.city_id) as city_id,\n"
> + " waybill.utime as utime \n"
> + " from waybill left join item \n"
> + " on waybill.id = item.id \n"
> + " ) \n"
> + " )\n"
> + " where rno =1\n"
> + ")\n"
> + "group by city_id";
> StatementSet statementSet = tableEnv.createStatementSet();
> Table table = tableEnv.sqlQuery(sql);
> DataStream<Tuple2<Boolean, Row>> rowDataStream = tableEnv.toRetractStream(table, Row.class);
> rowDataStream.printToErr();
> try
> { streamEnv.execute(); }
> catch (Exception e)
> { e.printStackTrace(); }
> }
> private static RowTypeInfo buildWaybillTableTypeInfo()
> { TypeInformation[] types = new TypeInformation[]
> {Types.INT(), Types.STRING(), Types.LONG(), Types.LONG()}
> ;
> String[] fields = new String[]\{"id", "city_id", "rider_id", "utime"};
> return new RowTypeInfo(types, fields);
> }
> private static RowTypeInfo buildItemTableTypeInfo()
> { TypeInformation[] types = new TypeInformation[]
> {Types.INT(), Types.STRING(), Types.LONG()}
> ;
> String[] fields = new String[]\{"id", "city_id", "utime"};
> return new RowTypeInfo(types, fields);
> }
> //id,rider_id,city_id,utime
> private static SourceFunction<Row> buildWaybillStreamSource(RowTypeInfo rowTypeInfo) {
> return new SourceFunction<Row>()
> { private volatile boolean stopped = false; int count = 0; int[] ids =
> {111, 222, 333, 111}
> ;
> String[] cityIds = \{"A", "A", "B", "A"};
> @Override
> public void run(SourceContext<Row> ctx) throws Exception {
> while (!stopped) {
> int id = ids[count % ids.length];
> String cityId = cityIds[count % cityIds.length];
> Row row = new Row(4);
> row.setField(0, id);
> row.setField(1, cityId);
> row.setField(2, (long) RandomUtils.nextInt(1000, 2000));
> row.setField(3, System.currentTimeMillis());
> printRow(rowTypeInfo, row);
> ctx.collect(row);
> if (++count > 3)
> { stopped = true; }
> }
> }
> @Override
> public void cancel() \{ stopped = true; }
> };
> }
> //id,city_id,utime
> private static SourceFunction<Row> buildItemStreamSource(RowTypeInfo rowTypeInfo) {
> return new SourceFunction<Row>()
> { private volatile boolean stopped = false; int count = 0; int[] ids =
> {111, 333}
> ;
> String[] cityIds = \{"C", "D"};
> @Override
> public void run(SourceContext<Row> ctx) throws Exception {
> while (!stopped) {
> Thread.sleep(RandomUtils.nextInt(1000, 2000));
> int id = ids[count % ids.length];
> String cityId = cityIds[count % cityIds.length];
> Row row = new Row(3);
> row.setField(0, id);
> row.setField(1, cityId);
> //row.setField(2, System.currentTimeMillis());
> printRow(rowTypeInfo, row);
> ctx.collect(row);
> if (++count >= 2)
> { stopped = true; }
> }
> }
> @Override
> public void cancel()
> { stopped = true; }
> };
> }
> public static void printRow(RowTypeInfo rowTypeInfo, Row row) {
> String prefix = "";
> for (int i = 0; i < rowTypeInfo.getArity(); ++i)
> { prefix = i > 0 ? "," : ""; System.out.print(prefix + rowTypeInfo.getFieldNames()[i] + ":" + row.getField(i)); }
> System.out.println();
> }
> ------------------------------------------------------------
> Wrong Result:
> id:111,city_id:A,rider_id:1137,utime:1650979957702
> id:222,city_id:A,rider_id:1976,utime:1650979957725
> id:333,city_id:B,rider_id:1916,utime:1650979957725
> id:111,city_id:A,rider_id:1345,utime:1650979957725
> (true,A,1)
> (false,A,1)
> (true,A,2)
> (true,B,1)
> (false,A,2)
> (true,A,1)
> (false,A,1)
> (true,A,2)
> id:111,city_id:C,utime:null
> (false,A,2)
> (true,A,1)
> (true,C,1)
> (false,A,1)
> (false,C,1)
> (true,C,2)
> id:333,city_id:D,utime:null
> (false,B,1)
> (true,D,1)
> The final result:
> C,2
> D,1
> is wrong.
> ------------------------------------------
> Right result:
> ------------------------------------------
> id:111,city_id:A,rider_id:1155,utime:1650980662019
> id:222,city_id:A,rider_id:1875,utime:1650980662042
> id:333,city_id:B,rider_id:1430,utime:1650980662042
> id:111,city_id:A,rider_id:1308,utime:1650980662042
> (true,A,1)
> (false,A,1)
> (true,A,2)
> (true,B,1)
> (false,A,2)
> (true,A,1)
> (false,A,1)
> (true,A,2)
> id:111,city_id:C,utime:null
> (false,A,2)
> (true,A,1)
> (false,A,1)
> (true,A,2)
> (false,A,2)
> (true,A,1)
> (true,C,1)
> id:333,city_id:D,utime:null
> (false,B,1)
> (true,D,1)
> The final result:
> A,1
> C,2
> D,1
> is right.
> ```
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)