You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by dpzhoufengdev <dp...@163.com> on 2020/11/23 08:27:23 UTC
flink sql 通过group by 滑窗计算的结果sink到kafka后有重复数据
flink sql 通过group by 滑窗计算的结果sink到kafka后有重复数据,每条数据都有两条完全一样的数据。这个是什么原因导致的?
聚合计算的逻辑
Table tableoneHour = tableEnv.sqlQuery(
"select appname" +
",productCode" +
",link" +
",count(case when nodeName = 'FailTerminateEndEvent' then 1 else null end) as errNum" +
",count(case when nodeName = 'EndEvent' and passStatus = 'Accept' then 1 else null end ) as passNum " +
",count(case when nodeName = 'EndEvent' and passStatus = 'Reject' then 1 else null end) as refNum " +
",count(case when nodeName = 'EndEvent' and passStatus <> 'Reject' and passStatus <> 'Accept' then 1 else null end) as processNum " +
",sum(case when nodeName = 'EndEvent' then loansum else 0 end ) as loansum" +
",count(1) as allNum " +
",'OneHour' as windowType " +
",HOP_END(rowtime, INTERVAL '1' HOUR, INTERVAL '1' HOUR) as inputtime " +
"from table1_2 WHERE link in ('1','2','5') GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' HOUR) " +
",appname,productCode,link");
将table转成dataStream
//计算的多张表union到一起
Table tablesql = tableHalfHour.unionAll(tableoneHour).unionAll(tableoneDay);
DataStream<Tuple2<Boolean, Row>> dataStream2 = tableEnv.toRetractStream(tablesql,Row.class);
DataStream<Tuple2<Boolean, Row>> dataStream7Day = tableEnv.toRetractStream(table7Day,Row.class);
//将Table转成dataStream
DataStream<String> reslut1 = dataStream2.map(new MapFunction<Tuple2<Boolean, Row>, String>() {
@Override
public String map(Tuple2<Boolean, Row> tuple2) throws Exception {
Map<String, Object> json = new HashMap<>();
json.put("appname",tuple2.f1.getField(0));
json.put("productCode", tuple2.f1.getField(1));
json.put("link",tuple2.f1.getField(2));
json.put("errNum",tuple2.f1.getField(3));
json.put("passNum",tuple2.f1.getField(4));
json.put("refNum",tuple2.f1.getField(5));
json.put("processNum",tuple2.f1.getField(6));
json.put("loansum",tuple2.f1.getField(7));
json.put("allNum",tuple2.f1.getField(8));
json.put("windowType",tuple2.f1.getField(9));
json.put("inputtime",tuple2.f1.getField(10));
return JSON.toJSONString(json);
}
});
将结果sink到kafka中
reslut1.addSink(new FlinkKafkaProducer08<>("********",new SimpleStringSchema(),props1));
reslut2.addSink(new FlinkKafkaProducer08<>("********",new SimpleStringSchema(),props1));
sink到kafka的数据存在两条完全一样的数据
| |
dpzhoufengdev
|
|
dpzhoufengdev@163.com
|
签名由网易邮箱大师定制