You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Federico D'Ambrosio <fe...@gmail.com> on 2019/07/25 14:50:37 UTC

Help with the correct Event Pattern

Hello everyone,

I need a bit of help concerning a correct formulation for a Complex Event
Pattern, using CEP.

I have a stream of events which once keyed for ids, they may look like this:

a b1 b2 b3 b4 b5 c1 c2 d1 d2 c3 c4 e1 e2 f1

what I want to achieve is to get, from a formulation similar to this:

[1] b c e

this:

b1 c1 e1

that is, for each input stream, have an output composed of only the first
appearance of events of class b, c and e.

I realize that a pattern formulated like [1] would also match:

b1 c2 e1, b1 c2 e2 and so on, so that I would need to refine it.

So, I tried using oneOrMore(), consecutive() and
AfterMatchSkipStrategy.skypToFirst, like this:

val b = Pattern
  .begin[Event]("b")
  .where((value, _) => value.state == "b")
  .oneOrMore().consecutive()

val c = Pattern
  .begin[Event]("c")
  .where((value, _) => value.state == "c")
  .oneOrMore().consecutive()

val e = Pattern
  .begin[Event]("e", AfterMatchSkipStrategy.skipToFirst("b"))
  .where((value, _) => value.state == "e")
  .oneOrMore().consecutive()

val pattern: Pattern[Event, _] =
  b.followedBy(c).followedBy(e)

In the process function I would do something like this:

override def processMatch(matches: util.Map[String, util.List[Event]],
                                  ctx: PatternProcessFunction.Context,
                                  out: Collector[OutputEvent]): Unit =  {

    val bEvent = matches.get("b").asScala.head
    val cEvent = matches.get("c").asScala.head
    val eEvent = matches.get("e").asScala.head

    out.collect(OutputEvent(bEvent, cEvent, eEvent))
}

But unfortunately it doesn't work like I want, which makes me think I'm
missing something within the functionalities of Flink CEP.

What's the best way to achieve what I want? Is it possible?
Should I even use any AfterMatchSkipStrategy?

Thank you,
Federico D'Ambrosio

Re: Help with the correct Event Pattern

Posted by Dawid Wysakowicz <dw...@apache.org>.
Have you tried pattern like:

/Pattern.begin[Event]("b",
//AfterMatchSkipStrategy.skipPastLast//).where(...).followedBy("c").where(...).followedBy("e").where(...)/

The method followedBy(Pattern) constructs a Pattern with a subGroup
pattern. The skip strategy there does not have any effect.

Best,

Dawid


On 25/07/2019 16:50, Federico D'Ambrosio wrote:
> Hello everyone,
>
> I need a bit of help concerning a correct formulation for a Complex
> Event Pattern, using CEP.
>
> I have a stream of events which once keyed for ids, they may look like
> this:
>
> a b1 b2 b3 b4 b5 c1 c2 d1 d2 c3 c4 e1 e2 f1
>
> what I want to achieve is to get, from a formulation similar to this:
>
> [1] b c e
>
> this:
>
> b1 c1 e1
>
> that is, for each input stream, have an output composed of only the
> first appearance of events of class b, c and e.
>
> I realize that a pattern formulated like [1] would also match:
>
> b1 c2 e1, b1 c2 e2 and so on, so that I would need to refine it.
>
> So, I tried using oneOrMore(), consecutive() and
> AfterMatchSkipStrategy.skypToFirst, like this:
>
> val b = Pattern
>   .begin[Event]("b")
>   .where((value, _) => value.state == "b")
>   .oneOrMore().consecutive()
>
> val c = Pattern
>   .begin[Event]("c")
>   .where((value, _) => value.state == "c")
>   .oneOrMore().consecutive()
>
> val e = Pattern
>   .begin[Event]("e", AfterMatchSkipStrategy.skipToFirst("b"))
>   .where((value, _) => value.state == "e")
>   .oneOrMore().consecutive()
>
> val pattern: Pattern[Event, _] =
>   b.followedBy(c).followedBy(e)
>
> In the process function I would do something like this:
>
> override def processMatch(matches: util.Map[String, util.List[Event]],
>                                   ctx: PatternProcessFunction.Context,
>                                   out: Collector[OutputEvent]): Unit =  {
>
>     val bEvent = matches.get("b").asScala.head
>     val cEvent = matches.get("c").asScala.head
>     val eEvent = matches.get("e").asScala.head
>
>     out.collect(OutputEvent(bEvent, cEvent, eEvent))
> }
>
> But unfortunately it doesn't work like I want, which makes me think
> I'm missing something within the functionalities of Flink CEP.
>
> What's the best way to achieve what I want? Is it possible?
> Should I even use any AfterMatchSkipStrategy?
>
> Thank you,
> Federico D'Ambrosio