You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Federico D'Ambrosio <fe...@gmail.com> on 2019/07/25 14:50:37 UTC
Help with the correct Event Pattern
Hello everyone,
I need a bit of help concerning a correct formulation for a Complex Event
Pattern, using CEP.
I have a stream of events which once keyed for ids, they may look like this:
a b1 b2 b3 b4 b5 c1 c2 d1 d2 c3 c4 e1 e2 f1
what I want to achieve is to get, from a formulation similar to this:
[1] b c e
this:
b1 c1 e1
that is, for each input stream, have an output composed of only the first
appearance of events of class b, c and e.
I realize that a pattern formulated like [1] would also match:
b1 c2 e1, b1 c2 e2 and so on, so that I would need to refine it.
So, I tried using oneOrMore(), consecutive() and
AfterMatchSkipStrategy.skypToFirst, like this:
val b = Pattern
.begin[Event]("b")
.where((value, _) => value.state == "b")
.oneOrMore().consecutive()
val c = Pattern
.begin[Event]("c")
.where((value, _) => value.state == "c")
.oneOrMore().consecutive()
val e = Pattern
.begin[Event]("e", AfterMatchSkipStrategy.skipToFirst("b"))
.where((value, _) => value.state == "e")
.oneOrMore().consecutive()
val pattern: Pattern[Event, _] =
b.followedBy(c).followedBy(e)
In the process function I would do something like this:
override def processMatch(matches: util.Map[String, util.List[Event]],
ctx: PatternProcessFunction.Context,
out: Collector[OutputEvent]): Unit = {
val bEvent = matches.get("b").asScala.head
val cEvent = matches.get("c").asScala.head
val eEvent = matches.get("e").asScala.head
out.collect(OutputEvent(bEvent, cEvent, eEvent))
}
But unfortunately it doesn't work like I want, which makes me think I'm
missing something within the functionalities of Flink CEP.
What's the best way to achieve what I want? Is it possible?
Should I even use any AfterMatchSkipStrategy?
Thank you,
Federico D'Ambrosio
Re: Help with the correct Event Pattern
Posted by Dawid Wysakowicz <dw...@apache.org>.
Have you tried pattern like:
/Pattern.begin[Event]("b",
//AfterMatchSkipStrategy.skipPastLast//).where(...).followedBy("c").where(...).followedBy("e").where(...)/
The method followedBy(Pattern) constructs a Pattern with a subGroup
pattern. The skip strategy there does not have any effect.
Best,
Dawid
On 25/07/2019 16:50, Federico D'Ambrosio wrote:
> Hello everyone,
>
> I need a bit of help concerning a correct formulation for a Complex
> Event Pattern, using CEP.
>
> I have a stream of events which once keyed for ids, they may look like
> this:
>
> a b1 b2 b3 b4 b5 c1 c2 d1 d2 c3 c4 e1 e2 f1
>
> what I want to achieve is to get, from a formulation similar to this:
>
> [1] b c e
>
> this:
>
> b1 c1 e1
>
> that is, for each input stream, have an output composed of only the
> first appearance of events of class b, c and e.
>
> I realize that a pattern formulated like [1] would also match:
>
> b1 c2 e1, b1 c2 e2 and so on, so that I would need to refine it.
>
> So, I tried using oneOrMore(), consecutive() and
> AfterMatchSkipStrategy.skypToFirst, like this:
>
> val b = Pattern
> .begin[Event]("b")
> .where((value, _) => value.state == "b")
> .oneOrMore().consecutive()
>
> val c = Pattern
> .begin[Event]("c")
> .where((value, _) => value.state == "c")
> .oneOrMore().consecutive()
>
> val e = Pattern
> .begin[Event]("e", AfterMatchSkipStrategy.skipToFirst("b"))
> .where((value, _) => value.state == "e")
> .oneOrMore().consecutive()
>
> val pattern: Pattern[Event, _] =
> b.followedBy(c).followedBy(e)
>
> In the process function I would do something like this:
>
> override def processMatch(matches: util.Map[String, util.List[Event]],
> ctx: PatternProcessFunction.Context,
> out: Collector[OutputEvent]): Unit = {
>
> val bEvent = matches.get("b").asScala.head
> val cEvent = matches.get("c").asScala.head
> val eEvent = matches.get("e").asScala.head
>
> out.collect(OutputEvent(bEvent, cEvent, eEvent))
> }
>
> But unfortunately it doesn't work like I want, which makes me think
> I'm missing something within the functionalities of Flink CEP.
>
> What's the best way to achieve what I want? Is it possible?
> Should I even use any AfterMatchSkipStrategy?
>
> Thank you,
> Federico D'Ambrosio