You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "zhangbin (Jira)" <ji...@apache.org> on 2022/04/26 14:11:00 UTC

[jira] [Created] (FLINK-27418) Flink SQL TopN result is wrong

zhangbin created FLINK-27418:
--------------------------------

             Summary: Flink SQL TopN result is wrong
                 Key: FLINK-27418
                 URL: https://issues.apache.org/jira/browse/FLINK-27418
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / API
    Affects Versions: 1.14.3, 1.12.2
         Environment: Flink 1.12.2 and Flink 1.14.3 test results are sometimes wrong
            Reporter: zhangbin


Flink SQL TopN is executed multiple times with different results, sometimes with correct results and sometimes with incorrect results.

Example:

@Test
public void testFlinkSqlJoinRetract() {
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();

StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(10000));

RowTypeInfo waybillTableTypeInfo = buildWaybillTableTypeInfo();
RowTypeInfo itemTableTypeInfo = buildItemTableTypeInfo();
SourceFunction<Row> waybillSourceFunction = buildWaybillStreamSource(waybillTableTypeInfo);
SourceFunction<Row> itemSourceFunction = buildItemStreamSource(itemTableTypeInfo);
String waybillTable = "waybill";
String itemTable = "item";

DataStreamSource<Row> waybillStream = streamEnv.addSource(
waybillSourceFunction,
waybillTable,
waybillTableTypeInfo);
DataStreamSource<Row> itemStream = streamEnv.addSource(
itemSourceFunction,
itemTable,
itemTableTypeInfo);

Expression[] waybillFields = ExpressionParser
.parseExpressionList(String.join(",", waybillTableTypeInfo.getFieldNames())
+ ",proctime.proctime").toArray(new Expression[0]);
Expression[] itemFields = ExpressionParser
.parseExpressionList(
String.join(",", itemTableTypeInfo.getFieldNames()) + ",proctime.proctime")
.toArray(new Expression[0]);

tableEnv.createTemporaryView(waybillTable, waybillStream, waybillFields);
tableEnv.createTemporaryView(itemTable, itemStream, itemFields);

String sql =
"select \n"
+ " city_id, \n"
+ " count(*) as cnt\n"
+ "from (\n"
+ " select id,city_id\n"
+ " from (\n"
+ " select \n"
+ " id,\n"
+ " city_id,\n"
+ " row_number() over(partition by id order by utime desc ) as rno \n"
+ " from (\n"
+ " select \n"
+ " waybill.id as id,\n"
+ " coalesce(item.city_id, waybill.city_id) as city_id,\n"
+ " waybill.utime as utime \n"
+ " from waybill left join item \n"
+ " on waybill.id = item.id \n"
+ " ) \n"
+ " )\n"
+ " where rno =1\n"
+ ")\n"
+ "group by city_id";

StatementSet statementSet = tableEnv.createStatementSet();
Table table = tableEnv.sqlQuery(sql);
DataStream<Tuple2<Boolean, Row>> rowDataStream = tableEnv.toRetractStream(table, Row.class);
rowDataStream.printToErr();
try {
streamEnv.execute();
} catch (Exception e) {
e.printStackTrace();
}
}

private static RowTypeInfo buildWaybillTableTypeInfo()

{ TypeInformation[] types = new TypeInformation[]\\{Types.INT(), Types.STRING(), Types.LONG(), Types.LONG()}

;
String[] fields = new String[]\{"id", "city_id", "rider_id", "utime"};
return new RowTypeInfo(types, fields);
}

private static RowTypeInfo buildItemTableTypeInfo()

{ TypeInformation[] types = new TypeInformation[]\\{Types.INT(), Types.STRING(), Types.LONG()}

;
String[] fields = new String[]\{"id", "city_id", "utime"};
return new RowTypeInfo(types, fields);
}

//id,rider_id,city_id,utime
private static SourceFunction<Row> buildWaybillStreamSource(RowTypeInfo rowTypeInfo) {
return new SourceFunction<Row>()

{ private volatile boolean stopped = false; int count = 0; int[] ids = \\{111, 222, 333, 111}

;
String[] cityIds = \{"A", "A", "B", "A"};

@Override
public void run(SourceContext<Row> ctx) throws Exception {
while (!stopped) {
int id = ids[count % ids.length];
String cityId = cityIds[count % cityIds.length];
Row row = new Row(4);
row.setField(0, id);
row.setField(1, cityId);
row.setField(2, (long) RandomUtils.nextInt(1000, 2000));
row.setField(3, System.currentTimeMillis());
printRow(rowTypeInfo, row);
ctx.collect(row);
if (++count > 3) {
stopped = true;
}
}
}

@Override
public void cancel() {
stopped = true;
}
};
}

//id,city_id,utime
private static SourceFunction<Row> buildItemStreamSource(RowTypeInfo rowTypeInfo) {
return new SourceFunction<Row>()

{ private volatile boolean stopped = false; int count = 0; int[] ids = \\{111, 333}

;
String[] cityIds = \{"C", "D"};

@Override
public void run(SourceContext<Row> ctx) throws Exception {
while (!stopped) {
Thread.sleep(RandomUtils.nextInt(1000, 2000));
int id = ids[count % ids.length];
String cityId = cityIds[count % cityIds.length];
Row row = new Row(3);
row.setField(0, id);
row.setField(1, cityId);
//row.setField(2, System.currentTimeMillis());
printRow(rowTypeInfo, row);
ctx.collect(row);
if (++count >= 2)

{ stopped = true; } }

}

@Override
public void cancel() {
stopped = true;
}
};
}

public static void printRow(RowTypeInfo rowTypeInfo, Row row) {
String prefix = "";
for (int i = 0; i < rowTypeInfo.getArity(); ++i) {
prefix = i > 0 ? "," : "";
System.out.print(prefix + rowTypeInfo.getFieldNames()[i] + ":" + row.getField(i));
}
System.out.println();
}

------------------------------------------------------------

Wrong Result:

id:111,city_id:A,rider_id:1137,utime:1650979957702
id:222,city_id:A,rider_id:1976,utime:1650979957725
id:333,city_id:B,rider_id:1916,utime:1650979957725
id:111,city_id:A,rider_id:1345,utime:1650979957725

(true,A,1)
(false,A,1)
(true,A,2)
(true,B,1)
(false,A,2)
(true,A,1)
(false,A,1)
(true,A,2)
id:111,city_id:C,utime:null

(false,A,2)
(true,A,1)
(true,C,1)
(false,A,1)
(false,C,1)
(true,C,2)

id:333,city_id:D,utime:null
(false,B,1)
(true,D,1)

The final result:
C,2
D,1
is wrong.
------------------------------------------

Right result:

------------------------------------------

id:111,city_id:A,rider_id:1155,utime:1650980662019
id:222,city_id:A,rider_id:1875,utime:1650980662042
id:333,city_id:B,rider_id:1430,utime:1650980662042
id:111,city_id:A,rider_id:1308,utime:1650980662042

(true,A,1)
(false,A,1)
(true,A,2)
(true,B,1)
(false,A,2)
(true,A,1)
(false,A,1)
(true,A,2)

id:111,city_id:C,utime:null
(false,A,2)
(true,A,1)
(false,A,1)
(true,A,2)
(false,A,2)
(true,A,1)
(true,C,1)

id:333,city_id:D,utime:null
(false,B,1)
(true,D,1)

The final result:
A,1
C,2
D,1
is right.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)