You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Elias Levy <fe...@gmail.com> on 2017/04/26 05:48:24 UTC

CEP join across events

There doesn't appear to be a way to join events across conditions using the
CEP library.

Consider events of the form (type, value_a, value_b) on a stream keyed by
the value_a field.

Under 1.2 you can create a pattern that for a given value_a, as specified
by the stream key, there is a match if an event of type 1 is followed by an
event of type 2 (e.g.
begin("foo").where(_.type==1).followedBy("bar").where(_.type==2).  But this
will return a match regardless of whether value_b in the first event
matches value_b in the second event.

1.3 snapshot introduces iterative conditions, but this is insufficient.  In
1.3 you can do:

begin("foo").where(_.type==1).followedBy("bar").where(
    (v, ctx) => {
       v.type == 2 &&
       ctx.getEventsForPattern("foo").asScala.exists(prev => prev.value_b
== v.value_b)
    })

This will accept the current event if any if any previously had a value_b
that matches the current event. But the matches will include all previous
events, even those that did not match the current event at value_b, instead
of only matching the previous event where value_b equals the current event.

Is there a way to only output the match there previous event matches the
current event value_b (e.g. foo == (type=1, value_a=K, value_b=X) and bar
== (type=2, value_a=K, value_b=X)?

Re: CEP join across events

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Elias,

I think this is a really interesting suggestion for the case where you do not have an “accumulating” 
value. Because imagine that you want to accept the “next” element, if the sum of all the previous 
is less than Y. To have a similar syntax with an accumulator, we should add more methods with 
additional arguments, right? 

For a first release, we opted for the simplest solution so that we can gather more information on 
how people intend to use the new features. Despite that, I really think that it is an interesting and 
more intuitive syntax so could you open a JIRA so that we move the discussion there, or if you 
want I can open it for you.

Thanks a lot for the suggestion,
Kostas

> On Apr 28, 2017, at 1:09 AM, Elias Levy <fe...@gmail.com> wrote:
> 
> It would be useful if there were a cleaner syntax for specifying relationships between matched events, as in an SQL join, particularly for conditions with a quantifier of one.
> 
> At the moment you have to do something like
> 
>     Pattern.
>       .begin[Foo]("first")
>         .where( first => first.baz == 1 )
>       .followedBy("next")
>         .where({ (next, ctx) =>
>           val first = ctx.getEventsForPattern("first").next
>           first.bar == next.bar && next => next.boo = "x"
>         })
> 
> which is not very clean.  It would friendlier if you could do something like:
> 
>     Pattern.
>       .begin[Foo]("first")
>         .where( first => first.baz == 1 )
>       .followedBy("next")
>         .relatedTo("first", { (first, next) => first.bar == next.bar })
>         .where( next => next.boo = "x" )
> 
> 
> 
> On Thu, Apr 27, 2017 at 1:21 AM, Kostas Kloudas <k.kloudas@data-artisans.com <ma...@data-artisans.com>> wrote:
> Glad that this is not a blocker for you and 
> you are right that we should clarify it better in the documentation.
> 


Re: CEP join across events

Posted by Elias Levy <fe...@gmail.com>.
It would be useful if there were a cleaner syntax for specifying
relationships between matched events, as in an SQL join, particularly for
conditions with a quantifier of one.

At the moment you have to do something like

    Pattern.
      .begin[Foo]("first")
        .where( first => first.baz == 1 )
      .followedBy("next")
        .where({ (next, ctx) =>
          val first = ctx.getEventsForPattern("first").next
          first.bar == next.bar && next => next.boo = "x"
        })

which is not very clean.  It would friendlier if you could do something
like:

    Pattern.
      .begin[Foo]("first")
        .where( first => first.baz == 1 )
      .followedBy("next")
        .relatedTo("first", { (first, next) => first.bar == next.bar })
        .where( next => next.boo = "x" )



On Thu, Apr 27, 2017 at 1:21 AM, Kostas Kloudas <k.kloudas@data-artisans.com
> wrote:

> Glad that this is not a blocker for you and
> you are right that we should clarify it better in the documentation.
>

Re: CEP join across events

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Elias,

Glad that this is not a blocker for you and 
you are right that we should clarify it better in the documentation.

Thanks,
Kostas

> On Apr 27, 2017, at 3:28 AM, Elias Levy <fe...@gmail.com> wrote:
> 
> You are correct.  Apologies for the confusion.  Given that ctx.getEventsForPattern returns an iterator instead of a value and that the example in the documentation deals with summing multiple matches, I got the impression that the call would return all previous matches instead of one at a time for each branch. 
> 
> I suppose it returns an iterator to support patterns where the event has some associated enumerator, like times(), zeroOrMore(), or oneOrMore(), yes?
> 
> Might be helpful to clarify this and point out that the iterator will contain a single value for the common case of match with a enumerator of one, which is the default.
> 
> 
> On Wed, Apr 26, 2017 at 2:15 AM, Kostas Kloudas <k.kloudas@data-artisans.com <ma...@data-artisans.com>> wrote:
> Hi Elias,
> 
> If I understand correctly your use case, you want for an input:
> 
> event_1 = (type=1, value_a=K, value_b=X)
> event_2 = (type=2, value_a=K, value_b=X)
> event_3 = (type=1, value_a=K, value_b=Y)
> 
> to get a match:
> 
> event_1, event_2
> 
> and discard event_3, right?
> 
> In this case, Dawid is correct and from a first look at your code, it should work.
> If not, what is the output that you get?
> 
> Kostas
> 
> 
>> On Apr 26, 2017, at 8:39 AM, Dawid Wysakowicz <wysakowicz.dawid@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi Elias,
>> 
>> You can do it with 1.3 and IterativeConditions. Method ctx.getEventsForPattern("foo") returns only those events that were matched in "foo" pattern in that particular branch.
>> I mean that for a sequence like (type =1, value_b = X); (type=1, value_b=Y); (type=2, value_b=X) both events of type = 1 create a seperate pattern branch and the event with type = 2 will be checked for a match twice for both of those branches.
>> 
>> Regards,
>> Dawid
>> 
>> 2017-04-26 7:48 GMT+02:00 Elias Levy <fearsome.lucidity@gmail.com <ma...@gmail.com>>:
>> There doesn't appear to be a way to join events across conditions using the CEP library.
>> 
>> Consider events of the form (type, value_a, value_b) on a stream keyed by the value_a field.  
>> 
>> Under 1.2 you can create a pattern that for a given value_a, as specified by the stream key, there is a match if an event of type 1 is followed by an event of type 2 (e.g. begin("foo").where(_.type==1).followedBy("bar").where(_.type==2).  But this will return a match regardless of whether value_b in the first event matches value_b in the second event.
>> 
>> 1.3 snapshot introduces iterative conditions, but this is insufficient.  In 1.3 you can do:
>> 
>> begin("foo").where(_.type==1).followedBy("bar").where(
>>     (v, ctx) => {
>>        v.type == 2 &&
>>        ctx.getEventsForPattern("foo").asScala.exists(prev => prev.value_b == v.value_b)
>>     })
>> 
>> This will accept the current event if any if any previously had a value_b that matches the current event. But the matches will include all previous events, even those that did not match the current event at value_b, instead of only matching the previous event where value_b equals the current event.
>> 
>> Is there a way to only output the match there previous event matches the current event value_b (e.g. foo == (type=1, value_a=K, value_b=X) and bar == (type=2, value_a=K, value_b=X)?
>> 
>> 
>> 
> 
> 


Re: CEP join across events

Posted by Elias Levy <fe...@gmail.com>.
You are correct.  Apologies for the confusion.  Given
that ctx.getEventsForPattern returns an iterator instead of a value and
that the example in the documentation deals with summing multiple matches,
I got the impression that the call would return all previous matches
instead of one at a time for each branch.

I suppose it returns an iterator to support patterns where the event has
some associated enumerator, like times(), zeroOrMore(), or oneOrMore(), yes?

Might be helpful to clarify this and point out that the iterator will
contain a single value for the common case of match with a enumerator of
one, which is the default.


On Wed, Apr 26, 2017 at 2:15 AM, Kostas Kloudas <k.kloudas@data-artisans.com
> wrote:

> Hi Elias,
>
> If I understand correctly your use case, you want for an input:
>
> event_1 = (type=1, value_a=K, value_b=X)
> event_2 = (type=2, value_a=K, value_b=X)
> event_3 = (type=1, value_a=K, value_b=Y)
>
> to get a match:
>
> event_1, event_2
>
> and discard event_3, right?
>
> In this case, Dawid is correct and from a first look at your code, it
> should work.
> If not, what is the output that you get?
>
> Kostas
>
>
> On Apr 26, 2017, at 8:39 AM, Dawid Wysakowicz <wy...@gmail.com>
> wrote:
>
> Hi Elias,
>
> You can do it with 1.3 and IterativeConditions. Method
> ctx.getEventsForPattern("foo") returns only those events that were matched
> in "foo" pattern in that particular branch.
> I mean that for a sequence like (type =1, value_b = X); (type=1,
> value_b=Y); (type=2, value_b=X) both events of type = 1 create a seperate
> pattern branch and the event with type = 2 will be checked for a match
> twice for both of those branches.
>
> Regards,
> Dawid
>
> 2017-04-26 7:48 GMT+02:00 Elias Levy <fe...@gmail.com>:
>
>> There doesn't appear to be a way to join events across conditions using
>> the CEP library.
>>
>> Consider events of the form (type, value_a, value_b) on a stream keyed by
>> the value_a field.
>>
>> Under 1.2 you can create a pattern that for a given value_a, as specified
>> by the stream key, there is a match if an event of type 1 is followed by an
>> event of type 2 (e.g. begin("foo").where(_.type==1).
>> followedBy("bar").where(_.type==2).  But this will return a match
>> regardless of whether value_b in the first event matches value_b in the
>> second event.
>>
>> 1.3 snapshot introduces iterative conditions, but this is insufficient.
>> In 1.3 you can do:
>>
>> begin("foo").where(_.type==1).followedBy("bar").where(
>>     (v, ctx) => {
>>        v.type == 2 &&
>>        ctx.getEventsForPattern("foo").asScala.exists(prev =>
>> prev.value_b == v.value_b)
>>     })
>>
>> This will accept the current event if any if any previously had a value_b
>> that matches the current event. But the matches will include all previous
>> events, even those that did not match the current event at value_b, instead
>> of only matching the previous event where value_b equals the current event.
>>
>> Is there a way to only output the match there previous event matches the
>> current event value_b (e.g. foo == (type=1, value_a=K, value_b=X) and bar
>> == (type=2, value_a=K, value_b=X)?
>>
>>
>>
>
>

Re: CEP join across events

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Elias,

If I understand correctly your use case, you want for an input:

event_1 = (type=1, value_a=K, value_b=X)
event_2 = (type=2, value_a=K, value_b=X)
event_3 = (type=1, value_a=K, value_b=Y)

to get a match:

event_1, event_2

and discard event_3, right?

In this case, Dawid is correct and from a first look at your code, it should work.
If not, what is the output that you get?

Kostas


> On Apr 26, 2017, at 8:39 AM, Dawid Wysakowicz <wy...@gmail.com> wrote:
> 
> Hi Elias,
> 
> You can do it with 1.3 and IterativeConditions. Method ctx.getEventsForPattern("foo") returns only those events that were matched in "foo" pattern in that particular branch.
> I mean that for a sequence like (type =1, value_b = X); (type=1, value_b=Y); (type=2, value_b=X) both events of type = 1 create a seperate pattern branch and the event with type = 2 will be checked for a match twice for both of those branches.
> 
> Regards,
> Dawid
> 
> 2017-04-26 7:48 GMT+02:00 Elias Levy <fearsome.lucidity@gmail.com <ma...@gmail.com>>:
> There doesn't appear to be a way to join events across conditions using the CEP library.
> 
> Consider events of the form (type, value_a, value_b) on a stream keyed by the value_a field.  
> 
> Under 1.2 you can create a pattern that for a given value_a, as specified by the stream key, there is a match if an event of type 1 is followed by an event of type 2 (e.g. begin("foo").where(_.type==1).followedBy("bar").where(_.type==2).  But this will return a match regardless of whether value_b in the first event matches value_b in the second event.
> 
> 1.3 snapshot introduces iterative conditions, but this is insufficient.  In 1.3 you can do:
> 
> begin("foo").where(_.type==1).followedBy("bar").where(
>     (v, ctx) => {
>        v.type == 2 &&
>        ctx.getEventsForPattern("foo").asScala.exists(prev => prev.value_b == v.value_b)
>     })
> 
> This will accept the current event if any if any previously had a value_b that matches the current event. But the matches will include all previous events, even those that did not match the current event at value_b, instead of only matching the previous event where value_b equals the current event.
> 
> Is there a way to only output the match there previous event matches the current event value_b (e.g. foo == (type=1, value_a=K, value_b=X) and bar == (type=2, value_a=K, value_b=X)?
> 
> 
> 


Re: CEP join across events

Posted by Dawid Wysakowicz <wy...@gmail.com>.
Hi Elias,

You can do it with 1.3 and IterativeConditions. Method
ctx.getEventsForPattern("foo") returns only those events that were matched
in "foo" pattern in that particular branch.
I mean that for a sequence like (type =1, value_b = X); (type=1,
value_b=Y); (type=2, value_b=X) both events of type = 1 create a seperate
pattern branch and the event with type = 2 will be checked for a match
twice for both of those branches.

Regards,
Dawid

2017-04-26 7:48 GMT+02:00 Elias Levy <fe...@gmail.com>:

> There doesn't appear to be a way to join events across conditions using
> the CEP library.
>
> Consider events of the form (type, value_a, value_b) on a stream keyed by
> the value_a field.
>
> Under 1.2 you can create a pattern that for a given value_a, as specified
> by the stream key, there is a match if an event of type 1 is followed by an
> event of type 2 (e.g. begin("foo").where(_.type==1).
> followedBy("bar").where(_.type==2).  But this will return a match
> regardless of whether value_b in the first event matches value_b in the
> second event.
>
> 1.3 snapshot introduces iterative conditions, but this is insufficient.
> In 1.3 you can do:
>
> begin("foo").where(_.type==1).followedBy("bar").where(
>     (v, ctx) => {
>        v.type == 2 &&
>        ctx.getEventsForPattern("foo").asScala.exists(prev => prev.value_b
> == v.value_b)
>     })
>
> This will accept the current event if any if any previously had a value_b
> that matches the current event. But the matches will include all previous
> events, even those that did not match the current event at value_b, instead
> of only matching the previous event where value_b equals the current event.
>
> Is there a way to only output the match there previous event matches the
> current event value_b (e.g. foo == (type=1, value_a=K, value_b=X) and bar
> == (type=2, value_a=K, value_b=X)?
>
>
>