You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Chao Wang <ch...@wustl.edu> on 2017/07/26 20:45:50 UTC

CEP condition expression and its event consuming strategy

Hi,

I have two questions regarding the use of the Flink CEP library 
(flink-cep_2.11:1.3.1), as follows:

1. I'd like to know how to use the API to express "emit event C in the 
presence of events A and B, with no restriction on the arriving order of 
A and B"? I've tried by creating two patterns, one for "A and then B" 
and the other for "B and then A", and consequently using two 
patternStreams to handle each case, which emits C. It worked but to me 
this approach seems redundant.

2. Given the above objective expression, how to consume the accepted 
events so that they will not be used for future matchings? For example, 
with the arriving sequence {A, B, A}, the CEP should only emit one C 
(due to the matching of {A,B}), not two Cs (due to {A,B} and {B,A}). 
Similarly, with the arriving sequence {B, A, B, A}, the CPE should only 
emit two Cs, not three.


Thanks,

Chao


Re: CEP condition expression and its event consuming strategy

Posted by Chao Wang <ch...@wustl.edu>.
Thank you, Dawid. FYI, I've implemented the discarding logic by 
CoFlatMapFunction, for the special case where there are only two input 
streams: I maintain a logical state (no match, input1 matched, or input2 
matched) and use private variables to store the matched event so far, 
which waits to be processed along with the event from the other input 
source.

Chao


On 07/31/2017 02:13 AM, Dawid Wysakowicz wrote:
> Ad. 1 Yes it returns and Iterable to support times and oneOrMore patterns(which can accept more than one event).
>
> Ad. 2 Some use case for not discarding used events could be e.g. looking for some shapes in our data, e.g. W-shapes. In this case one W-shape could start on the middle peak of the previous one.
>
> Unfortunately personally I can’t point you to any in-use applications. Maybe Kostas, I’ve added to the discussion, know of any.
>
> Anyway, thanks for interest in the CEP library. We will be happy to hear any comments and suggestions for future improvements.
>
>
>
>> On 28 Jul 2017, at 21:54, Chao Wang <ch...@wustl.edu> wrote:
>>
>> Hi Dawid,
>>
>> Thank you.
>>
>> Ad. 1 I noticed that the method getEventsForPattern() returns an Iterable<T> and we need to further invoke .operator().next() to get access to the event value.
>>
>> Ad. 2 Here is a bit about a use case we have that calls for such discarding semantics. In the event processing project I am currently working on, input event streams are sensor data, and we join streams and do Kalman filtering, FFT, etc. We therefore choose to discard the accepted events once the data they carry have been processed; otherwise, it may cause duplicated processing as well as incorrect join semantics.
>>
>> We came up with this question while doing an empirical comparison of Flink and our system (implemented with the TAO real-time event service). We implemented in our system such semantics, by removing input events once CEP emits the corresponding output events.
>>
>> Could you provide some use cases where the discarding semantics are not needed? I guess I am wired into processing sensor data and thus cannot think of a case where reusing accepted events would be of interest. Also, could you share some pointers to streaming application in-use? We are seeking to make our research work more relevant to current practice.
>>
>> Thank you very much,
>>
>> Chao
>>
>> On 07/27/2017 02:17 AM, Dawid Wysakowicz wrote:
>>> Hi Chao,
>>>
>>> Ad. 1 You could implement it with IterativeCondition. Sth like this:
>>>
>>> Pattern<Event, ?> pattern = Pattern.<Event>begin("first").where(new SimpleCondition<Event>() {
>>>     @Override
>>>     public boolean filter(Event value) throws Exception {
>>>        return value.equals("A") || value.equals("B");
>>>     }
>>> }).followedBy("second").where(new IterativeCondition<Event>() {
>>>     @Override
>>>     public boolean filter(Event value, Context<Event> ctx) throws Exception {
>>>        return (value.equals("A") || value.equals("B")) && !value.equals(ctx.getEventsForPattern("first"));
>>>     }
>>> }).
>>>
>>> Ad. 2 Unfortunately right now as you said Pattern restarts each other event and it is not possible to change that strategy. There is ongoing work to introduce AfterMatchSkipStrategy[1], but at best it will be merged in 1.4.0. I did not give it much thought, but I would try implement some discarding logic.
>>>
>>> Regards,
>>> Dawid
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-7169
>>>
>>>> On 26 Jul 2017, at 22:45, Chao Wang <ch...@wustl.edu> wrote:
>>>>
>>>> Hi,
>>>>
>>>> I have two questions regarding the use of the Flink CEP library (flink-cep_2.11:1.3.1), as follows:
>>>>
>>>> 1. I'd like to know how to use the API to express "emit event C in the presence of events A and B, with no restriction on the arriving order of A and B"? I've tried by creating two patterns, one for "A and then B" and the other for "B and then A", and consequently using two patternStreams to handle each case, which emits C. It worked but to me this approach seems redundant.
>>>>
>>>> 2. Given the above objective expression, how to consume the accepted events so that they will not be used for future matchings? For example, with the arriving sequence {A, B, A}, the CEP should only emit one C (due to the matching of {A,B}), not two Cs (due to {A,B} and {B,A}). Similarly, with the arriving sequence {B, A, B, A}, the CPE should only emit two Cs, not three.
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Chao
>>>>


Re: CEP condition expression and its event consuming strategy

Posted by Dawid Wysakowicz <wy...@gmail.com>.
Ad. 1 Yes it returns and Iterable to support times and oneOrMore patterns(which can accept more than one event).

Ad. 2 Some use case for not discarding used events could be e.g. looking for some shapes in our data, e.g. W-shapes. In this case one W-shape could start on the middle peak of the previous one.

Unfortunately personally I can’t point you to any in-use applications. Maybe Kostas, I’ve added to the discussion, know of any.

Anyway, thanks for interest in the CEP library. We will be happy to hear any comments and suggestions for future improvements.



> On 28 Jul 2017, at 21:54, Chao Wang <ch...@wustl.edu> wrote:
> 
> Hi Dawid,
> 
> Thank you.
> 
> Ad. 1 I noticed that the method getEventsForPattern() returns an Iterable<T> and we need to further invoke .operator().next() to get access to the event value.
> 
> Ad. 2 Here is a bit about a use case we have that calls for such discarding semantics. In the event processing project I am currently working on, input event streams are sensor data, and we join streams and do Kalman filtering, FFT, etc. We therefore choose to discard the accepted events once the data they carry have been processed; otherwise, it may cause duplicated processing as well as incorrect join semantics.
> 
> We came up with this question while doing an empirical comparison of Flink and our system (implemented with the TAO real-time event service). We implemented in our system such semantics, by removing input events once CEP emits the corresponding output events.
> 
> Could you provide some use cases where the discarding semantics are not needed? I guess I am wired into processing sensor data and thus cannot think of a case where reusing accepted events would be of interest. Also, could you share some pointers to streaming application in-use? We are seeking to make our research work more relevant to current practice.
> 
> Thank you very much,
> 
> Chao
> 
> On 07/27/2017 02:17 AM, Dawid Wysakowicz wrote:
>> Hi Chao,
>> 
>> Ad. 1 You could implement it with IterativeCondition. Sth like this:
>> 
>> Pattern<Event, ?> pattern = Pattern.<Event>begin("first").where(new SimpleCondition<Event>() {
>>    @Override
>>    public boolean filter(Event value) throws Exception {
>>       return value.equals("A") || value.equals("B");
>>    }
>> }).followedBy("second").where(new IterativeCondition<Event>() {
>>    @Override
>>    public boolean filter(Event value, Context<Event> ctx) throws Exception {
>>       return (value.equals("A") || value.equals("B")) && !value.equals(ctx.getEventsForPattern("first"));
>>    }
>> }).
>> 
>> Ad. 2 Unfortunately right now as you said Pattern restarts each other event and it is not possible to change that strategy. There is ongoing work to introduce AfterMatchSkipStrategy[1], but at best it will be merged in 1.4.0. I did not give it much thought, but I would try implement some discarding logic.
>> 
>> Regards,
>> Dawid
>> 
>> [1] https://issues.apache.org/jira/browse/FLINK-7169
>> 
>>> On 26 Jul 2017, at 22:45, Chao Wang <ch...@wustl.edu> wrote:
>>> 
>>> Hi,
>>> 
>>> I have two questions regarding the use of the Flink CEP library (flink-cep_2.11:1.3.1), as follows:
>>> 
>>> 1. I'd like to know how to use the API to express "emit event C in the presence of events A and B, with no restriction on the arriving order of A and B"? I've tried by creating two patterns, one for "A and then B" and the other for "B and then A", and consequently using two patternStreams to handle each case, which emits C. It worked but to me this approach seems redundant.
>>> 
>>> 2. Given the above objective expression, how to consume the accepted events so that they will not be used for future matchings? For example, with the arriving sequence {A, B, A}, the CEP should only emit one C (due to the matching of {A,B}), not two Cs (due to {A,B} and {B,A}). Similarly, with the arriving sequence {B, A, B, A}, the CPE should only emit two Cs, not three.
>>> 
>>> 
>>> Thanks,
>>> 
>>> Chao
>>> 
> 


Re: CEP condition expression and its event consuming strategy

Posted by Chao Wang <ch...@wustl.edu>.
Hi Dawid,

Thank you.

Ad. 1 I noticed that the method getEventsForPattern() returns an 
Iterable<T> and we need to further invoke .operator().next() to get 
access to the event value.

Ad. 2 Here is a bit about a use case we have that calls for such 
discarding semantics. In the event processing project I am currently 
working on, input event streams are sensor data, and we join streams and 
do Kalman filtering, FFT, etc. We therefore choose to discard the 
accepted events once the data they carry have been processed; otherwise, 
it may cause duplicated processing as well as incorrect join semantics.

We came up with this question while doing an empirical comparison of 
Flink and our system (implemented with the TAO real-time event service). 
We implemented in our system such semantics, by removing input events 
once CEP emits the corresponding output events.

Could you provide some use cases where the discarding semantics are not 
needed? I guess I am wired into processing sensor data and thus cannot 
think of a case where reusing accepted events would be of interest. 
Also, could you share some pointers to streaming application in-use? We 
are seeking to make our research work more relevant to current practice.

Thank you very much,

Chao

On 07/27/2017 02:17 AM, Dawid Wysakowicz wrote:
> Hi Chao,
>
> Ad. 1 You could implement it with IterativeCondition. Sth like this:
>
> Pattern<Event, ?> pattern = Pattern.<Event>begin("first").where(new SimpleCondition<Event>() {
>     @Override
>     public boolean filter(Event value) throws Exception {
>        return value.equals("A") || value.equals("B");
>     }
> }).followedBy("second").where(new IterativeCondition<Event>() {
>     @Override
>     public boolean filter(Event value, Context<Event> ctx) throws Exception {
>        return (value.equals("A") || value.equals("B")) && !value.equals(ctx.getEventsForPattern("first"));
>     }
> }).
>
> Ad. 2 Unfortunately right now as you said Pattern restarts each other event and it is not possible to change that strategy. There is ongoing work to introduce AfterMatchSkipStrategy[1], but at best it will be merged in 1.4.0. I did not give it much thought, but I would try implement some discarding logic.
>
> Regards,
> Dawid
>
> [1] https://issues.apache.org/jira/browse/FLINK-7169
>
>> On 26 Jul 2017, at 22:45, Chao Wang <ch...@wustl.edu> wrote:
>>
>> Hi,
>>
>> I have two questions regarding the use of the Flink CEP library (flink-cep_2.11:1.3.1), as follows:
>>
>> 1. I'd like to know how to use the API to express "emit event C in the presence of events A and B, with no restriction on the arriving order of A and B"? I've tried by creating two patterns, one for "A and then B" and the other for "B and then A", and consequently using two patternStreams to handle each case, which emits C. It worked but to me this approach seems redundant.
>>
>> 2. Given the above objective expression, how to consume the accepted events so that they will not be used for future matchings? For example, with the arriving sequence {A, B, A}, the CEP should only emit one C (due to the matching of {A,B}), not two Cs (due to {A,B} and {B,A}). Similarly, with the arriving sequence {B, A, B, A}, the CPE should only emit two Cs, not three.
>>
>>
>> Thanks,
>>
>> Chao
>>


Re: CEP condition expression and its event consuming strategy

Posted by Dawid Wysakowicz <wy...@gmail.com>.
Hi Chao,

Ad. 1 You could implement it with IterativeCondition. Sth like this:

Pattern<Event, ?> pattern = Pattern.<Event>begin("first").where(new SimpleCondition<Event>() {
   @Override
   public boolean filter(Event value) throws Exception {
      return value.equals("A") || value.equals("B");
   }
}).followedBy("second").where(new IterativeCondition<Event>() {
   @Override
   public boolean filter(Event value, Context<Event> ctx) throws Exception {
      return (value.equals("A") || value.equals("B")) && !value.equals(ctx.getEventsForPattern("first"));
   }
}).

Ad. 2 Unfortunately right now as you said Pattern restarts each other event and it is not possible to change that strategy. There is ongoing work to introduce AfterMatchSkipStrategy[1], but at best it will be merged in 1.4.0. I did not give it much thought, but I would try implement some discarding logic.

Regards,
Dawid

[1] https://issues.apache.org/jira/browse/FLINK-7169

> On 26 Jul 2017, at 22:45, Chao Wang <ch...@wustl.edu> wrote:
> 
> Hi,
> 
> I have two questions regarding the use of the Flink CEP library (flink-cep_2.11:1.3.1), as follows:
> 
> 1. I'd like to know how to use the API to express "emit event C in the presence of events A and B, with no restriction on the arriving order of A and B"? I've tried by creating two patterns, one for "A and then B" and the other for "B and then A", and consequently using two patternStreams to handle each case, which emits C. It worked but to me this approach seems redundant.
> 
> 2. Given the above objective expression, how to consume the accepted events so that they will not be used for future matchings? For example, with the arriving sequence {A, B, A}, the CEP should only emit one C (due to the matching of {A,B}), not two Cs (due to {A,B} and {B,A}). Similarly, with the arriving sequence {B, A, B, A}, the CPE should only emit two Cs, not three.
> 
> 
> Thanks,
> 
> Chao
>