You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Dominik Wosiński <wo...@gmail.com> on 2020/03/11 20:18:02 UTC

AfterMatchSkipStrategy for timed out patterns

Hey all,

I was wondering whether for CEP the *AfterMatchSkipStrategy *is applied
during matching or if simply the results are removed after the match. The
question is the result of the experiments I was doing with CEP. Say I have
the readings from some sensor and I want to detect events over some
threshold. So I have something like below:

Pattern.begin[AccelVector]("beginning",
AfterMatchSkipStrategy.skipPastLastEvent())
  .where(_.data() < Threshold)
  .optional
  .followedBy(EventPatternName)
  .where(event => event.data() >= Threshold)
  .oneOrMore
  .greedy
  .consecutive()
  .followedBy("end")
  .where(_.data() < Threshold)
  .oneOrMore
  .within(Time.minutes(1))


The thing is that sometimes sensors may stop sending data or the data is
lost so I would like to emit events that have technically timed out. I have
created a PatternProcessFunction that simply gets events that have timed
out and check for *EventPatternName* part.

It works fine, but I have noticed weird behavior that the events that get
passed to the *processTimedOutMatch *are repeated as if there was no
*AfterMatchSkipStrategy.*

So, for example say the Threshold=200, and I have the following events for
one of the sensors:
Event1 (timestamp= 1, data = 10)
Event2 (timestamp= 2, data = 250)
Event3 (timestamp= 3, data = 300)
Event4 (timestamp= 4, data = 350)
Event5 (timestamp= 5, data = 400)
Event6 (timestamp= 6, data = 450)

After that, this sensor stops sending data but others are sending data so
the watermark is progressing - this obviously causes timeout of the
pattern. And the issue I have is the fact that  *processTimedOutMatch* gets
called multiple times, first for the whole pattern Event1 to Event6 and
each call just skips one event so next, I have Event2 to Event 6, Event3 to
Event6 up to just Event6.

My understanding is that *AfterMatchSkipStrategy *should wipe out those
partial matches or does it work differently for timed out matches?

Thanks in advance,
Best Regards,
Dom.

Re: AfterMatchSkipStrategy for timed out patterns

Posted by Till Rohrmann <tr...@apache.org>.
Hi Dominik,

at the moment the AfterMatchSkipStrategy are only applied to fully matching
sequences. In case of timeouts, the AfterMatchSkipStrategy will be ignored
because technically it is not a match.

Cheers,
Till

On Mon, Mar 16, 2020 at 5:39 PM Dominik Wosiński <wo...@gmail.com> wrote:

>
> Hey all,
>
> I was wondering whether for CEP the *AfterMatchSkipStrategy *is applied
> during matching or if simply the results are removed after the match. The
> question is the result of the experiments I was doing with CEP. Say I have
> the readings from some sensor and I want to detect events over some
> threshold. So I have something like below:
>
> Pattern.begin[AccelVector]("beginning", AfterMatchSkipStrategy.skipPastLastEvent())
>   .where(_.data() < Threshold)
>   .optional
>   .followedBy(EventPatternName)
>   .where(event => event.data() >= Threshold)
>   .oneOrMore
>   .greedy
>   .consecutive()
>   .followedBy("end")
>   .where(_.data() < Threshold)
>   .oneOrMore
>   .within(Time.minutes(1))
>
>
> The thing is that sometimes sensors may stop sending data or the data is
> lost so I would like to emit events that have technically timed out. I have
> created a PatternProcessFunction that simply gets events that have timed
> out and check for *EventPatternName* part.
>
> It works fine, but I have noticed weird behavior that the events that get
> passed to the *processTimedOutMatch *are repeated as if there was no
> *AfterMatchSkipStrategy.*
>
> So, for example say the Threshold=200, and I have the following events for
> one of the sensors:
> Event1 (timestamp= 1, data = 10)
> Event2 (timestamp= 2, data = 250)
> Event3 (timestamp= 3, data = 300)
> Event4 (timestamp= 4, data = 350)
> Event5 (timestamp= 5, data = 400)
> Event6 (timestamp= 6, data = 450)
>
> After that, this sensor stops sending data but others are sending data so
> the watermark is progressing - this obviously causes timeout of the
> pattern. And the issue I have is the fact that  *processTimedOutMatch* gets
> called multiple times, first for the whole pattern Event1 to Event6 and
> each call just skips one event so next, I have Event2 to Event 6, Event3 to
> Event6 up to just Event6.
>
> My understanding is that *AfterMatchSkipStrategy *should wipe out those
> partial matches or does it work differently for timed out matches?
>
> Thanks in advance,
> Best Regards,
> Dom.
>

Fwd: AfterMatchSkipStrategy for timed out patterns

Posted by Dominik Wosiński <wo...@gmail.com>.
Hey all,

I was wondering whether for CEP the *AfterMatchSkipStrategy *is applied
during matching or if simply the results are removed after the match. The
question is the result of the experiments I was doing with CEP. Say I have
the readings from some sensor and I want to detect events over some
threshold. So I have something like below:

Pattern.begin[AccelVector]("beginning",
AfterMatchSkipStrategy.skipPastLastEvent())
  .where(_.data() < Threshold)
  .optional
  .followedBy(EventPatternName)
  .where(event => event.data() >= Threshold)
  .oneOrMore
  .greedy
  .consecutive()
  .followedBy("end")
  .where(_.data() < Threshold)
  .oneOrMore
  .within(Time.minutes(1))


The thing is that sometimes sensors may stop sending data or the data is
lost so I would like to emit events that have technically timed out. I have
created a PatternProcessFunction that simply gets events that have timed
out and check for *EventPatternName* part.

It works fine, but I have noticed weird behavior that the events that get
passed to the *processTimedOutMatch *are repeated as if there was no
*AfterMatchSkipStrategy.*

So, for example say the Threshold=200, and I have the following events for
one of the sensors:
Event1 (timestamp= 1, data = 10)
Event2 (timestamp= 2, data = 250)
Event3 (timestamp= 3, data = 300)
Event4 (timestamp= 4, data = 350)
Event5 (timestamp= 5, data = 400)
Event6 (timestamp= 6, data = 450)

After that, this sensor stops sending data but others are sending data so
the watermark is progressing - this obviously causes timeout of the
pattern. And the issue I have is the fact that  *processTimedOutMatch* gets
called multiple times, first for the whole pattern Event1 to Event6 and
each call just skips one event so next, I have Event2 to Event 6, Event3 to
Event6 up to just Event6.

My understanding is that *AfterMatchSkipStrategy *should wipe out those
partial matches or does it work differently for timed out matches?

Thanks in advance,
Best Regards,
Dom.