You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 知而不惑 <ch...@qq.com.INVALID> on 2023/03/15 09:24:55 UTC

flink cep问题

请问有没有大佬知道cep里面开窗是滚动窗口和滑动窗口?
已知within 和oneOrMore 这种匹配规则都是可以传递时间开窗的


另外我好像收不到你们的邮件回复,但是我可以收到其他人的问题和回复。我是通过订阅地址订阅的:user-zh-subscribe@flink.apache.org&nbsp; &nbsp; 发送问题是这个地址:user-zh@flink.apache.org


问这个问题的主要原因就是我感觉有数据丢失,我是想统计一段时间内所有的数据,并且将这些入库,我的写法是这样的:




StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 省略&nbsp; source 从kafka读
JsonDeserializationSchema<DdrAlertModel&gt; jsonFormat = new JsonDeserializationSchema<&gt;(DdrAlertModel.class);


DataStreamSource<DdrAlertModel&gt; alertStream = env.fromSource(source, WatermarkStrategy.forMonotonousTimestamps(), "alert source");




// 匹配30s所有
Pattern<DdrAlertModel, DdrAlertModel&gt; sendPattern = Pattern.<DdrAlertModel&gt;begin("begin")
&nbsp; &nbsp; &nbsp; &nbsp; .where(new SimpleCondition<&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public boolean filter(DdrAlertModel value) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return true;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; }).within(Time.seconds(30));&nbsp; // 这种写法84条数据 落库8条
&nbsp; &nbsp; &nbsp; &nbsp; // 这种也试过了,落库17条
&nbsp; &nbsp; &nbsp; &nbsp; // .oneOrMore(Time.seconds(30)).optional().greedy()




PatternStream<DdrAlertModel&gt; patternStream = CEP.pattern(alertStream, sendPattern);


// select拿所有列表
patternStream.select(new PatternSelectFunction<DdrAlertModel, List<DdrAlertModel&gt;&gt;() {
&nbsp; &nbsp; @Override
&nbsp; &nbsp; public List<DdrAlertModel&gt; select(Map<String, List<DdrAlertModel&gt;&gt; pattern) {
&nbsp; &nbsp; &nbsp; &nbsp; return pattern.get("begin");
&nbsp; &nbsp; }
})


// 对数据做一定处理
.map(new MapFunction<List<DdrAlertModel&gt;, List<Incident&gt;&gt;() {
&nbsp; &nbsp; @Override
&nbsp; &nbsp; public List<Incident&gt; map(List<DdrAlertModel&gt; value) {
&nbsp; &nbsp; &nbsp; &nbsp; return value.stream().map(v -&gt; {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; try {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return sensitiveSendIncident(v, mapper);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } catch (JsonProcessingException e) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; throw new RuntimeException(e);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; }).collect(Collectors.toList());
&nbsp; &nbsp; }
})
// 打平后sink
.flatMap(new FlatMapFunction<List<Incident&gt;, Incident&gt;() {
&nbsp; &nbsp; @Override
&nbsp; &nbsp; public void flatMap(List<Incident&gt; value, Collector<Incident&gt; out) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; for (Incident incident : value) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; out.collect(incident);
&nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; }
}).addSink(IncidentSink.getIncidentSink());