You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 知而不惑 <ch...@qq.com.INVALID> on 2023/03/15 09:24:55 UTC
flink cep问题
请问有没有大佬知道cep里面开窗是滚动窗口和滑动窗口?
已知within 和oneOrMore 这种匹配规则都是可以传递时间开窗的
另外我好像收不到你们的邮件回复,但是我可以收到其他人的问题和回复。我是通过订阅地址订阅的:user-zh-subscribe@flink.apache.org 发送问题是这个地址:user-zh@flink.apache.org
问这个问题的主要原因就是我感觉有数据丢失,我是想统计一段时间内所有的数据,并且将这些入库,我的写法是这样的:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 省略 source 从kafka读
JsonDeserializationSchema<DdrAlertModel> jsonFormat = new JsonDeserializationSchema<>(DdrAlertModel.class);
DataStreamSource<DdrAlertModel> alertStream = env.fromSource(source, WatermarkStrategy.forMonotonousTimestamps(), "alert source");
// 匹配30s所有
Pattern<DdrAlertModel, DdrAlertModel> sendPattern = Pattern.<DdrAlertModel>begin("begin")
.where(new SimpleCondition<>() {
@Override
public boolean filter(DdrAlertModel value) {
return true;
}
}).within(Time.seconds(30)); // 这种写法84条数据 落库8条
// 这种也试过了,落库17条
// .oneOrMore(Time.seconds(30)).optional().greedy()
PatternStream<DdrAlertModel> patternStream = CEP.pattern(alertStream, sendPattern);
// select拿所有列表
patternStream.select(new PatternSelectFunction<DdrAlertModel, List<DdrAlertModel>>() {
@Override
public List<DdrAlertModel> select(Map<String, List<DdrAlertModel>> pattern) {
return pattern.get("begin");
}
})
// 对数据做一定处理
.map(new MapFunction<List<DdrAlertModel>, List<Incident>>() {
@Override
public List<Incident> map(List<DdrAlertModel> value) {
return value.stream().map(v -> {
try {
return sensitiveSendIncident(v, mapper);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}).collect(Collectors.toList());
}
})
// 打平后sink
.flatMap(new FlatMapFunction<List<Incident>, Incident>() {
@Override
public void flatMap(List<Incident> value, Collector<Incident> out) throws Exception {
for (Incident incident : value) {
out.collect(incident);
}
}
}).addSink(IncidentSink.getIncidentSink());