You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Shailesh Jain <sh...@stellapps.com> on 2018/03/14 13:20:58 UTC

[Proposal] CEP library changes - review request

Hi,

We've been facing issues* w.r.t watermarks not supported per key, which led
us to:

Either (a) run the job in Processing time for a KeyedStream -> compromising
on use cases which revolve around catching time-based patterns
or (b) run the job in Event time for multiple data streams (one data stream
per key) -> this is not scalable as the number of operators grow linearly
with the number of keys

To address this, we've done a quick (poc) change in the
AbstractKeyedCEPPatternOperator to allow for the NFAs to progress based on
timestamps extracted from the events arriving into the operator (and not
from the watermarks). We've tested it against our usecase and are seeing a
significant improvement in memory usage without compromising on the
watermark functionality.

It'll be really helpful if someone from the cep dev group can take a look
at this branch - https://github.com/jainshailesh/flink/commits/cep_changes
and provide comments on the approach taken, and maybe guide us on the next
steps for taking it forward.

Thanks,
Shailesh

* Links to previous email threads related to the same issue:
http://apache-flink-user-mailing-list-archive.2336050.
n4.nabble.com/Question-on-event-time-functionality-
using-Flink-in-a-IoT-usecase-td18653.html
http://apache-flink-user-mailing-list-archive.2336050.
n4.nabble.com/Generate-watermarks-per-key-in-a-KeyedStream-td16629.html
http://apache-flink-user-mailing-list-archive.2336050.
n4.nabble.com/Correlation-between-number-of-operators-
and-Job-manager-memory-requirements-td18384.html

Re: [Proposal] CEP library changes - review request

Posted by Shailesh Jain <sh...@stellapps.com>.
Thank you, Kostas, for reviewing this.

Although points 1 and 3 are something which I was planning to address in
the actual implementation, #2 would still be a show stopper.

I'll spend some more time on this and maybe come up with a better way to
achieve the same use case without mixing the two notions of time.

Until then I hope it is OK if we use the modified library to unblock
ourselves.

Thanks,
Shailesh

On Tue, Apr 3, 2018 at 3:05 PM, Kostas Kloudas <k....@data-artisans.com>
wrote:

> Hi Shailesh,
>
> Your solution may fit your use case, but as Dawid mentioned earlier, it
> makes a lot of
> assumptions about the input.
>
> From a look at your PoC:
> 1) You assume no late data (you do not drop anything) and no
> out-of-orderness.
> 2) You mix the two notions of time (event and processing).
> 3) You eagerly process each element which can have performance
> implications especially if
>     you go for RocksDb backend.
>
> Given the above, I do not think that this can go in Flink.
>
> Something that goes in Flink will have to be maintained by the community.
> So, although some use cases may have particular needs, we refrain from
> adding
> to the master, code that makes assumptions specifically tailored for
> specific use cases.
>
> I understand that the one watermark per key could conceptually fit better
> in your use case,
> but there may be a better way to achieve your goal, one that aligns with
> Flink’s offered
> semantics.
>
> Thanks,
> Kostas
>
> > On Apr 3, 2018, at 11:01 AM, Shailesh Jain <sh...@stellapps.com>
> wrote:
> >
> > Bump.
> >
> > On Thu, Mar 22, 2018 at 7:54 PM, Shailesh Jain <
> shailesh.jain@stellapps.com>
> > wrote:
> >
> >> To trigger the computations for each batch, I'll have to use the
> >> processing time timer in the abstract keyed cep operator, right?
> >>
> >> The reason why I'm avoiding the watermarks is that it is not possible to
> >> generate watermarks per key.
> >>
> >> Thanks for the 'within' remark.
> >>
> >> A couple of questions:
> >>
> >> 1. Given our use case and the limitations of per key watermark, do you
> >> think that this approach is worth adding to the library?
> >>
> >> 2. What other aspects of the framework do I need to consider/test before
> >> we go about implementing this approach formally?
> >>
> >> Thanks,
> >> Shailesh
> >>
> >>
> >> On Thu 22 Mar, 2018, 6:58 PM Dawid Wysakowicz, <
> wysakowicz.dawid@gmail.com>
> >> wrote:
> >>
> >>> If you do the buffering you can emit watermark for each such batch
> (equal
> >>> to highest timestamp in such batch). This way you won’t need to sort.
> CEP
> >>> library will do it for you.
> >>> The within clause will work in EventTime then.
> >>>
> >>> One more remark also the within clause always work for whole pattern
> not
> >>> just to a part of it, it does not matter if you apply it in the middle
> (as
> >>> you did) or at the very end.
> >>>
> >>> Best,
> >>> Dawid
> >>>
> >>>> On 19 Mar 2018, at 11:31, Shailesh Jain <sh...@stellapps.com>
> >>> wrote:
> >>>>
> >>>> Thanks for your reply, Dawid.
> >>>>
> >>>> I understand that the approach I've tried out is not generic enough,
> and
> >>>> would need a lot more thought to be put into w.r.t parallelism
> >>>> considerations, out of order events, effects on downstream operators
> >>> etc.
> >>>> The intention was to do a quick implementation to check the
> feasibility
> >>> of
> >>>> the approach.
> >>>>
> >>>>>> It will also not sort the events etc.
> >>>>
> >>>> In the application code to test this approach, I had used a Global
> >>> window
> >>>> to sort events based on their timestamp (similar to how out of order
> >>> events
> >>>> are dropped based on a time-bound, I'm dropping them based on a count
> >>> based
> >>>> bound).
> >>>>
> >>>> allEvents = allEvents
> >>>>       .keyBy(event -> event.getAttributeValue(EVENT_SOURCE_ID))
> >>>>       .window(GlobalWindows.create())
> >>>>       .trigger(new GlobalWindowCountTrigger(
> >>> propLoader.getSortWindowSize()))
> >>>>       .process(new SortWindowProcessFunction())
> >>>>       .keyBy(event -> event.getAttributeValue(EVENT_SOURCE_ID))
> >>>>       .assignTimestampsAndWatermarks(new TimestampsExtractor())
> >>>>       .uid(Constants.TS_EX_UID);
> >>>> PatternLoader
> >>>>       .applyPatterns(allEvents, propLoader.getPatternClassNames())
> >>>>       .addSink(createKafkaSink(kafkaProps))
> >>>>       .uid(Constants.KAFKA_SINK_COMPLEX_EVENTS_UID);
> >>>>
> >>>>
> >>>>>> If in the getCurrentWatermark method of your
> >>>> AssignerWithPeriodicWatermarks you will just return
> >>>>>> new Watermark(System.currentTimeMillis()), you will get the same
> >>>> behaviour as with that change,
> >>>>>> am I right?
> >>>>
> >>>> If watermarks are generated based on the machine time, the major
> issue I
> >>>> see is that we will not be able to leverage Event Time functionality.
> >>>> Specifically, if I have patterns which look for the absence of an
> Event
> >>> for
> >>>> a fixed period of time.
> >>>>
> >>>> For eg. We have many such patterns:
> >>>>
> >>>> Pattern<Event, Event> pattern = Pattern.<Event>begin
> >>>>       (UNDER_CHILLED_ALERT, AfterMatchSkipStrategy
> >>>>               .skipPastLastEvent())
> >>>>       .where(Conditions.getUnderchilledCondition())
> >>>>       .notFollowedBy(COMPRESSOR_ON)
> >>>>       .where(Conditions.getCompressorOnCondition())
> >>>>       .within(Time.minutes(30))
> >>>>       .followedBy(HIGH_TEMP)
> >>>>       .where(Conditions.getHighTemperatureCondition());
> >>>>
> >>>> Now when there are network issues (which are very frequent), queued
> >>> events
> >>>> are delivered together, and such patterns will not be matched
> correctly
> >>> as
> >>>> pruning of events from NFA's buffer will not be done based on the
> >>> timestamp
> >>>> within the event, but on the watermark received by the operator.
> >>>>
> >>>> Is my understanding here correct?
> >>>>
> >>>> Thanks,
> >>>> Shailesh
> >>>>
> >>>> On Mon, Mar 19, 2018 at 3:17 PM, Dawid Wysakowicz <
> >>>> wysakowicz.dawid@gmail.com> wrote:
> >>>>
> >>>>> Hi Shailesh,
> >>>>>
> >>>>> Thanks for your interest in the CEP library and sorry for late
> >>> response. I
> >>>>> must say I am not fun of this approach.
> >>>>> After this change, the Processing time is no longer a processing
> time,
> >>>>> plus it will work differently in any other place of Flink. It will
> >>> also not
> >>>>> sort the events etc.
> >>>>> Moreover I think you could achieve pretty similar solution if you
> >>> generate
> >>>>> your watermark based on the machine time. If in the
> getCurrentWatermark
> >>>>> method
> >>>>> of your AssignerWithPeriodicWatermarks you will just return new
> >>>>> Watermark(System.currentTimeMillis()), you will get the same
> >>> behaviour as
> >>>>> with that change, am I right?
> >>>>>
> >>>>> Best,
> >>>>> Dawid
> >>>>>
> >>>>>> On 18 Mar 2018, at 09:00, Shailesh Jain <
> shailesh.jain@stellapps.com>
> >>>>> wrote:
> >>>>>>
> >>>>>> Thanks Aljoscha.
> >>>>>>
> >>>>>> Bump.
> >>>>>>
> >>>>>> I understand everyone would be busy with 1.5.0, but would really
> >>>>> appreciate
> >>>>>> slight help in unblocking us here.
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Shailesh
> >>>>>>
> >>>>>> On Thu, Mar 15, 2018 at 1:47 AM, Aljoscha Krettek <
> >>> aljoscha@apache.org>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>> I think this should have been sent to the dev mailing list because
> in
> >>>>> the
> >>>>>>> user mailing list it might disappear among a lot of other mail.
> >>>>>>>
> >>>>>>> Forwarding...
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Aljoscha
> >>>>>>>
> >>>>>>>> On 14. Mar 2018, at 06:20, Shailesh Jain <
> >>> shailesh.jain@stellapps.com>
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>> Hi,
> >>>>>>>>
> >>>>>>>> We've been facing issues* w.r.t watermarks not supported per key,
> >>> which
> >>>>>>> led us to:
> >>>>>>>>
> >>>>>>>> Either (a) run the job in Processing time for a KeyedStream ->
> >>>>>>> compromising on use cases which revolve around catching time-based
> >>>>> patterns
> >>>>>>>> or (b) run the job in Event time for multiple data streams (one
> data
> >>>>>>> stream per key) -> this is not scalable as the number of operators
> >>> grow
> >>>>>>> linearly with the number of keys
> >>>>>>>>
> >>>>>>>> To address this, we've done a quick (poc) change in the
> >>>>>>> AbstractKeyedCEPPatternOperator to allow for the NFAs to progress
> >>> based
> >>>>>>> on timestamps extracted from the events arriving into the operator
> >>> (and
> >>>>> not
> >>>>>>> from the watermarks). We've tested it against our usecase and are
> >>>>> seeing a
> >>>>>>> significant improvement in memory usage without compromising on the
> >>>>>>> watermark functionality.
> >>>>>>>>
> >>>>>>>> It'll be really helpful if someone from the cep dev group can
> take a
> >>>>>>> look at this branch - https://github.com/
> jainshailesh/flink/commits/
> >>>>>>> cep_changes <https://github.com/jainshailesh/flink/commits/
> >>> cep_changes>
> >>>>>>> and provide comments on the approach taken, and maybe guide us on
> the
> >>>>> next
> >>>>>>> steps for taking it forward.
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>> Shailesh
> >>>>>>>>
> >>>>>>>> * Links to previous email threads related to the same issue:
> >>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.
> >>>>>>> n4.nabble.com/Question-on-event-time-functionality-
> >>>>>>> using-Flink-in-a-IoT-usecase-td18653.html <
> http://apache-flink-user-
> >>>>>>> mailing-list-archive.2336050.n4.nabble.com/Question-on-
> >>>>>>> event-time-functionality-using-Flink-in-a-IoT-usecase-
> td18653.html>
> >>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.
> >>>>>>> n4.nabble.com/Generate-watermarks-per-key-in-a-
> >>> KeyedStream-td16629.html
> >>>>> <
> >>>>>>> http://apache-flink-user-mailing-list-archive.2336050.
> >>>>>>> n4.nabble.com/Generate-watermarks-per-key-in-a-
> >>> KeyedStream-td16629.html
> >>>>>>
> >>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.
> >>>>>>> n4.nabble.com/Correlation-between-number-of-operators-
> >>>>>>> and-Job-manager-memory-requirements-td18384.html <
> >>>>>>> http://apache-flink-user-mailing-list-archive.2336050.
> >>>>>>> n4.nabble.com/Correlation-between-number-of-operators-
> >>>>>>> and-Job-manager-memory-requirements-td18384.html>
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>
> >>>>>
> >>>
> >>>
>
>

Re: [Proposal] CEP library changes - review request

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Shailesh,

Your solution may fit your use case, but as Dawid mentioned earlier, it makes a lot of 
assumptions about the input. 

From a look at your PoC:
1) You assume no late data (you do not drop anything) and no out-of-orderness.
2) You mix the two notions of time (event and processing).
3) You eagerly process each element which can have performance implications especially if 
    you go for RocksDb backend.

Given the above, I do not think that this can go in Flink.
 
Something that goes in Flink will have to be maintained by the community. 
So, although some use cases may have particular needs, we refrain from adding 
to the master, code that makes assumptions specifically tailored for specific use cases.

I understand that the one watermark per key could conceptually fit better in your use case, 
but there may be a better way to achieve your goal, one that aligns with Flink’s offered 
semantics.

Thanks, 
Kostas

> On Apr 3, 2018, at 11:01 AM, Shailesh Jain <sh...@stellapps.com> wrote:
> 
> Bump.
> 
> On Thu, Mar 22, 2018 at 7:54 PM, Shailesh Jain <sh...@stellapps.com>
> wrote:
> 
>> To trigger the computations for each batch, I'll have to use the
>> processing time timer in the abstract keyed cep operator, right?
>> 
>> The reason why I'm avoiding the watermarks is that it is not possible to
>> generate watermarks per key.
>> 
>> Thanks for the 'within' remark.
>> 
>> A couple of questions:
>> 
>> 1. Given our use case and the limitations of per key watermark, do you
>> think that this approach is worth adding to the library?
>> 
>> 2. What other aspects of the framework do I need to consider/test before
>> we go about implementing this approach formally?
>> 
>> Thanks,
>> Shailesh
>> 
>> 
>> On Thu 22 Mar, 2018, 6:58 PM Dawid Wysakowicz, <wy...@gmail.com>
>> wrote:
>> 
>>> If you do the buffering you can emit watermark for each such batch (equal
>>> to highest timestamp in such batch). This way you won’t need to sort. CEP
>>> library will do it for you.
>>> The within clause will work in EventTime then.
>>> 
>>> One more remark also the within clause always work for whole pattern not
>>> just to a part of it, it does not matter if you apply it in the middle (as
>>> you did) or at the very end.
>>> 
>>> Best,
>>> Dawid
>>> 
>>>> On 19 Mar 2018, at 11:31, Shailesh Jain <sh...@stellapps.com>
>>> wrote:
>>>> 
>>>> Thanks for your reply, Dawid.
>>>> 
>>>> I understand that the approach I've tried out is not generic enough, and
>>>> would need a lot more thought to be put into w.r.t parallelism
>>>> considerations, out of order events, effects on downstream operators
>>> etc.
>>>> The intention was to do a quick implementation to check the feasibility
>>> of
>>>> the approach.
>>>> 
>>>>>> It will also not sort the events etc.
>>>> 
>>>> In the application code to test this approach, I had used a Global
>>> window
>>>> to sort events based on their timestamp (similar to how out of order
>>> events
>>>> are dropped based on a time-bound, I'm dropping them based on a count
>>> based
>>>> bound).
>>>> 
>>>> allEvents = allEvents
>>>>       .keyBy(event -> event.getAttributeValue(EVENT_SOURCE_ID))
>>>>       .window(GlobalWindows.create())
>>>>       .trigger(new GlobalWindowCountTrigger(
>>> propLoader.getSortWindowSize()))
>>>>       .process(new SortWindowProcessFunction())
>>>>       .keyBy(event -> event.getAttributeValue(EVENT_SOURCE_ID))
>>>>       .assignTimestampsAndWatermarks(new TimestampsExtractor())
>>>>       .uid(Constants.TS_EX_UID);
>>>> PatternLoader
>>>>       .applyPatterns(allEvents, propLoader.getPatternClassNames())
>>>>       .addSink(createKafkaSink(kafkaProps))
>>>>       .uid(Constants.KAFKA_SINK_COMPLEX_EVENTS_UID);
>>>> 
>>>> 
>>>>>> If in the getCurrentWatermark method of your
>>>> AssignerWithPeriodicWatermarks you will just return
>>>>>> new Watermark(System.currentTimeMillis()), you will get the same
>>>> behaviour as with that change,
>>>>>> am I right?
>>>> 
>>>> If watermarks are generated based on the machine time, the major issue I
>>>> see is that we will not be able to leverage Event Time functionality.
>>>> Specifically, if I have patterns which look for the absence of an Event
>>> for
>>>> a fixed period of time.
>>>> 
>>>> For eg. We have many such patterns:
>>>> 
>>>> Pattern<Event, Event> pattern = Pattern.<Event>begin
>>>>       (UNDER_CHILLED_ALERT, AfterMatchSkipStrategy
>>>>               .skipPastLastEvent())
>>>>       .where(Conditions.getUnderchilledCondition())
>>>>       .notFollowedBy(COMPRESSOR_ON)
>>>>       .where(Conditions.getCompressorOnCondition())
>>>>       .within(Time.minutes(30))
>>>>       .followedBy(HIGH_TEMP)
>>>>       .where(Conditions.getHighTemperatureCondition());
>>>> 
>>>> Now when there are network issues (which are very frequent), queued
>>> events
>>>> are delivered together, and such patterns will not be matched correctly
>>> as
>>>> pruning of events from NFA's buffer will not be done based on the
>>> timestamp
>>>> within the event, but on the watermark received by the operator.
>>>> 
>>>> Is my understanding here correct?
>>>> 
>>>> Thanks,
>>>> Shailesh
>>>> 
>>>> On Mon, Mar 19, 2018 at 3:17 PM, Dawid Wysakowicz <
>>>> wysakowicz.dawid@gmail.com> wrote:
>>>> 
>>>>> Hi Shailesh,
>>>>> 
>>>>> Thanks for your interest in the CEP library and sorry for late
>>> response. I
>>>>> must say I am not fun of this approach.
>>>>> After this change, the Processing time is no longer a processing time,
>>>>> plus it will work differently in any other place of Flink. It will
>>> also not
>>>>> sort the events etc.
>>>>> Moreover I think you could achieve pretty similar solution if you
>>> generate
>>>>> your watermark based on the machine time. If in the getCurrentWatermark
>>>>> method
>>>>> of your AssignerWithPeriodicWatermarks you will just return new
>>>>> Watermark(System.currentTimeMillis()), you will get the same
>>> behaviour as
>>>>> with that change, am I right?
>>>>> 
>>>>> Best,
>>>>> Dawid
>>>>> 
>>>>>> On 18 Mar 2018, at 09:00, Shailesh Jain <sh...@stellapps.com>
>>>>> wrote:
>>>>>> 
>>>>>> Thanks Aljoscha.
>>>>>> 
>>>>>> Bump.
>>>>>> 
>>>>>> I understand everyone would be busy with 1.5.0, but would really
>>>>> appreciate
>>>>>> slight help in unblocking us here.
>>>>>> 
>>>>>> Thanks,
>>>>>> Shailesh
>>>>>> 
>>>>>> On Thu, Mar 15, 2018 at 1:47 AM, Aljoscha Krettek <
>>> aljoscha@apache.org>
>>>>>> wrote:
>>>>>> 
>>>>>>> Hi,
>>>>>>> 
>>>>>>> I think this should have been sent to the dev mailing list because in
>>>>> the
>>>>>>> user mailing list it might disappear among a lot of other mail.
>>>>>>> 
>>>>>>> Forwarding...
>>>>>>> 
>>>>>>> Best,
>>>>>>> Aljoscha
>>>>>>> 
>>>>>>>> On 14. Mar 2018, at 06:20, Shailesh Jain <
>>> shailesh.jain@stellapps.com>
>>>>>>> wrote:
>>>>>>>> 
>>>>>>>> Hi,
>>>>>>>> 
>>>>>>>> We've been facing issues* w.r.t watermarks not supported per key,
>>> which
>>>>>>> led us to:
>>>>>>>> 
>>>>>>>> Either (a) run the job in Processing time for a KeyedStream ->
>>>>>>> compromising on use cases which revolve around catching time-based
>>>>> patterns
>>>>>>>> or (b) run the job in Event time for multiple data streams (one data
>>>>>>> stream per key) -> this is not scalable as the number of operators
>>> grow
>>>>>>> linearly with the number of keys
>>>>>>>> 
>>>>>>>> To address this, we've done a quick (poc) change in the
>>>>>>> AbstractKeyedCEPPatternOperator to allow for the NFAs to progress
>>> based
>>>>>>> on timestamps extracted from the events arriving into the operator
>>> (and
>>>>> not
>>>>>>> from the watermarks). We've tested it against our usecase and are
>>>>> seeing a
>>>>>>> significant improvement in memory usage without compromising on the
>>>>>>> watermark functionality.
>>>>>>>> 
>>>>>>>> It'll be really helpful if someone from the cep dev group can take a
>>>>>>> look at this branch - https://github.com/jainshailesh/flink/commits/
>>>>>>> cep_changes <https://github.com/jainshailesh/flink/commits/
>>> cep_changes>
>>>>>>> and provide comments on the approach taken, and maybe guide us on the
>>>>> next
>>>>>>> steps for taking it forward.
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> Shailesh
>>>>>>>> 
>>>>>>>> * Links to previous email threads related to the same issue:
>>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.
>>>>>>> n4.nabble.com/Question-on-event-time-functionality-
>>>>>>> using-Flink-in-a-IoT-usecase-td18653.html <http://apache-flink-user-
>>>>>>> mailing-list-archive.2336050.n4.nabble.com/Question-on-
>>>>>>> event-time-functionality-using-Flink-in-a-IoT-usecase-td18653.html>
>>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.
>>>>>>> n4.nabble.com/Generate-watermarks-per-key-in-a-
>>> KeyedStream-td16629.html
>>>>> <
>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.
>>>>>>> n4.nabble.com/Generate-watermarks-per-key-in-a-
>>> KeyedStream-td16629.html
>>>>>> 
>>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.
>>>>>>> n4.nabble.com/Correlation-between-number-of-operators-
>>>>>>> and-Job-manager-memory-requirements-td18384.html <
>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.
>>>>>>> n4.nabble.com/Correlation-between-number-of-operators-
>>>>>>> and-Job-manager-memory-requirements-td18384.html>
>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>> 
>>>>> 
>>> 
>>> 


Re: [Proposal] CEP library changes - review request

Posted by Shailesh Jain <sh...@stellapps.com>.
Bump.

On Thu, Mar 22, 2018 at 7:54 PM, Shailesh Jain <sh...@stellapps.com>
wrote:

> To trigger the computations for each batch, I'll have to use the
> processing time timer in the abstract keyed cep operator, right?
>
> The reason why I'm avoiding the watermarks is that it is not possible to
> generate watermarks per key.
>
> Thanks for the 'within' remark.
>
> A couple of questions:
>
> 1. Given our use case and the limitations of per key watermark, do you
> think that this approach is worth adding to the library?
>
> 2. What other aspects of the framework do I need to consider/test before
> we go about implementing this approach formally?
>
> Thanks,
> Shailesh
>
>
> On Thu 22 Mar, 2018, 6:58 PM Dawid Wysakowicz, <wy...@gmail.com>
> wrote:
>
>> If you do the buffering you can emit watermark for each such batch (equal
>> to highest timestamp in such batch). This way you won’t need to sort. CEP
>> library will do it for you.
>> The within clause will work in EventTime then.
>>
>> One more remark also the within clause always work for whole pattern not
>> just to a part of it, it does not matter if you apply it in the middle (as
>> you did) or at the very end.
>>
>> Best,
>> Dawid
>>
>> > On 19 Mar 2018, at 11:31, Shailesh Jain <sh...@stellapps.com>
>> wrote:
>> >
>> > Thanks for your reply, Dawid.
>> >
>> > I understand that the approach I've tried out is not generic enough, and
>> > would need a lot more thought to be put into w.r.t parallelism
>> > considerations, out of order events, effects on downstream operators
>> etc.
>> > The intention was to do a quick implementation to check the feasibility
>> of
>> > the approach.
>> >
>> >>> It will also not sort the events etc.
>> >
>> > In the application code to test this approach, I had used a Global
>> window
>> > to sort events based on their timestamp (similar to how out of order
>> events
>> > are dropped based on a time-bound, I'm dropping them based on a count
>> based
>> > bound).
>> >
>> > allEvents = allEvents
>> >        .keyBy(event -> event.getAttributeValue(EVENT_SOURCE_ID))
>> >        .window(GlobalWindows.create())
>> >        .trigger(new GlobalWindowCountTrigger(
>> propLoader.getSortWindowSize()))
>> >        .process(new SortWindowProcessFunction())
>> >        .keyBy(event -> event.getAttributeValue(EVENT_SOURCE_ID))
>> >        .assignTimestampsAndWatermarks(new TimestampsExtractor())
>> >        .uid(Constants.TS_EX_UID);
>> > PatternLoader
>> >        .applyPatterns(allEvents, propLoader.getPatternClassNames())
>> >        .addSink(createKafkaSink(kafkaProps))
>> >        .uid(Constants.KAFKA_SINK_COMPLEX_EVENTS_UID);
>> >
>> >
>> >>> If in the getCurrentWatermark method of your
>> > AssignerWithPeriodicWatermarks you will just return
>> >>> new Watermark(System.currentTimeMillis()), you will get the same
>> > behaviour as with that change,
>> >>> am I right?
>> >
>> > If watermarks are generated based on the machine time, the major issue I
>> > see is that we will not be able to leverage Event Time functionality.
>> > Specifically, if I have patterns which look for the absence of an Event
>> for
>> > a fixed period of time.
>> >
>> > For eg. We have many such patterns:
>> >
>> > Pattern<Event, Event> pattern = Pattern.<Event>begin
>> >        (UNDER_CHILLED_ALERT, AfterMatchSkipStrategy
>> >                .skipPastLastEvent())
>> >        .where(Conditions.getUnderchilledCondition())
>> >        .notFollowedBy(COMPRESSOR_ON)
>> >        .where(Conditions.getCompressorOnCondition())
>> >        .within(Time.minutes(30))
>> >        .followedBy(HIGH_TEMP)
>> >        .where(Conditions.getHighTemperatureCondition());
>> >
>> > Now when there are network issues (which are very frequent), queued
>> events
>> > are delivered together, and such patterns will not be matched correctly
>> as
>> > pruning of events from NFA's buffer will not be done based on the
>> timestamp
>> > within the event, but on the watermark received by the operator.
>> >
>> > Is my understanding here correct?
>> >
>> > Thanks,
>> > Shailesh
>> >
>> > On Mon, Mar 19, 2018 at 3:17 PM, Dawid Wysakowicz <
>> > wysakowicz.dawid@gmail.com> wrote:
>> >
>> >> Hi Shailesh,
>> >>
>> >> Thanks for your interest in the CEP library and sorry for late
>> response. I
>> >> must say I am not fun of this approach.
>> >> After this change, the Processing time is no longer a processing time,
>> >> plus it will work differently in any other place of Flink. It will
>> also not
>> >> sort the events etc.
>> >> Moreover I think you could achieve pretty similar solution if you
>> generate
>> >> your watermark based on the machine time. If in the getCurrentWatermark
>> >> method
>> >> of your AssignerWithPeriodicWatermarks you will just return new
>> >> Watermark(System.currentTimeMillis()), you will get the same
>> behaviour as
>> >> with that change, am I right?
>> >>
>> >> Best,
>> >> Dawid
>> >>
>> >>> On 18 Mar 2018, at 09:00, Shailesh Jain <sh...@stellapps.com>
>> >> wrote:
>> >>>
>> >>> Thanks Aljoscha.
>> >>>
>> >>> Bump.
>> >>>
>> >>> I understand everyone would be busy with 1.5.0, but would really
>> >> appreciate
>> >>> slight help in unblocking us here.
>> >>>
>> >>> Thanks,
>> >>> Shailesh
>> >>>
>> >>> On Thu, Mar 15, 2018 at 1:47 AM, Aljoscha Krettek <
>> aljoscha@apache.org>
>> >>> wrote:
>> >>>
>> >>>> Hi,
>> >>>>
>> >>>> I think this should have been sent to the dev mailing list because in
>> >> the
>> >>>> user mailing list it might disappear among a lot of other mail.
>> >>>>
>> >>>> Forwarding...
>> >>>>
>> >>>> Best,
>> >>>> Aljoscha
>> >>>>
>> >>>>> On 14. Mar 2018, at 06:20, Shailesh Jain <
>> shailesh.jain@stellapps.com>
>> >>>> wrote:
>> >>>>>
>> >>>>> Hi,
>> >>>>>
>> >>>>> We've been facing issues* w.r.t watermarks not supported per key,
>> which
>> >>>> led us to:
>> >>>>>
>> >>>>> Either (a) run the job in Processing time for a KeyedStream ->
>> >>>> compromising on use cases which revolve around catching time-based
>> >> patterns
>> >>>>> or (b) run the job in Event time for multiple data streams (one data
>> >>>> stream per key) -> this is not scalable as the number of operators
>> grow
>> >>>> linearly with the number of keys
>> >>>>>
>> >>>>> To address this, we've done a quick (poc) change in the
>> >>>> AbstractKeyedCEPPatternOperator to allow for the NFAs to progress
>> based
>> >>>> on timestamps extracted from the events arriving into the operator
>> (and
>> >> not
>> >>>> from the watermarks). We've tested it against our usecase and are
>> >> seeing a
>> >>>> significant improvement in memory usage without compromising on the
>> >>>> watermark functionality.
>> >>>>>
>> >>>>> It'll be really helpful if someone from the cep dev group can take a
>> >>>> look at this branch - https://github.com/jainshailesh/flink/commits/
>> >>>> cep_changes <https://github.com/jainshailesh/flink/commits/
>> cep_changes>
>> >>>> and provide comments on the approach taken, and maybe guide us on the
>> >> next
>> >>>> steps for taking it forward.
>> >>>>>
>> >>>>> Thanks,
>> >>>>> Shailesh
>> >>>>>
>> >>>>> * Links to previous email threads related to the same issue:
>> >>>>> http://apache-flink-user-mailing-list-archive.2336050.
>> >>>> n4.nabble.com/Question-on-event-time-functionality-
>> >>>> using-Flink-in-a-IoT-usecase-td18653.html <http://apache-flink-user-
>> >>>> mailing-list-archive.2336050.n4.nabble.com/Question-on-
>> >>>> event-time-functionality-using-Flink-in-a-IoT-usecase-td18653.html>
>> >>>>> http://apache-flink-user-mailing-list-archive.2336050.
>> >>>> n4.nabble.com/Generate-watermarks-per-key-in-a-
>> KeyedStream-td16629.html
>> >> <
>> >>>> http://apache-flink-user-mailing-list-archive.2336050.
>> >>>> n4.nabble.com/Generate-watermarks-per-key-in-a-
>> KeyedStream-td16629.html
>> >>>
>> >>>>> http://apache-flink-user-mailing-list-archive.2336050.
>> >>>> n4.nabble.com/Correlation-between-number-of-operators-
>> >>>> and-Job-manager-memory-requirements-td18384.html <
>> >>>> http://apache-flink-user-mailing-list-archive.2336050.
>> >>>> n4.nabble.com/Correlation-between-number-of-operators-
>> >>>> and-Job-manager-memory-requirements-td18384.html>
>> >>>>>
>> >>>>
>> >>>>
>> >>
>> >>
>>
>>

Re: [Proposal] CEP library changes - review request

Posted by Shailesh Jain <sh...@stellapps.com>.
To trigger the computations for each batch, I'll have to use the processing
time timer in the abstract keyed cep operator, right?

The reason why I'm avoiding the watermarks is that it is not possible to
generate watermarks per key.

Thanks for the 'within' remark.

A couple of questions:

1. Given our use case and the limitations of per key watermark, do you
think that this approach is worth adding to the library?

2. What other aspects of the framework do I need to consider/test before we
go about implementing this approach formally?

Thanks,
Shailesh

On Thu 22 Mar, 2018, 6:58 PM Dawid Wysakowicz, <wy...@gmail.com>
wrote:

> If you do the buffering you can emit watermark for each such batch (equal
> to highest timestamp in such batch). This way you won’t need to sort. CEP
> library will do it for you.
> The within clause will work in EventTime then.
>
> One more remark also the within clause always work for whole pattern not
> just to a part of it, it does not matter if you apply it in the middle (as
> you did) or at the very end.
>
> Best,
> Dawid
>
> > On 19 Mar 2018, at 11:31, Shailesh Jain <sh...@stellapps.com>
> wrote:
> >
> > Thanks for your reply, Dawid.
> >
> > I understand that the approach I've tried out is not generic enough, and
> > would need a lot more thought to be put into w.r.t parallelism
> > considerations, out of order events, effects on downstream operators etc.
> > The intention was to do a quick implementation to check the feasibility
> of
> > the approach.
> >
> >>> It will also not sort the events etc.
> >
> > In the application code to test this approach, I had used a Global window
> > to sort events based on their timestamp (similar to how out of order
> events
> > are dropped based on a time-bound, I'm dropping them based on a count
> based
> > bound).
> >
> > allEvents = allEvents
> >        .keyBy(event -> event.getAttributeValue(EVENT_SOURCE_ID))
> >        .window(GlobalWindows.create())
> >        .trigger(new
> GlobalWindowCountTrigger(propLoader.getSortWindowSize()))
> >        .process(new SortWindowProcessFunction())
> >        .keyBy(event -> event.getAttributeValue(EVENT_SOURCE_ID))
> >        .assignTimestampsAndWatermarks(new TimestampsExtractor())
> >        .uid(Constants.TS_EX_UID);
> > PatternLoader
> >        .applyPatterns(allEvents, propLoader.getPatternClassNames())
> >        .addSink(createKafkaSink(kafkaProps))
> >        .uid(Constants.KAFKA_SINK_COMPLEX_EVENTS_UID);
> >
> >
> >>> If in the getCurrentWatermark method of your
> > AssignerWithPeriodicWatermarks you will just return
> >>> new Watermark(System.currentTimeMillis()), you will get the same
> > behaviour as with that change,
> >>> am I right?
> >
> > If watermarks are generated based on the machine time, the major issue I
> > see is that we will not be able to leverage Event Time functionality.
> > Specifically, if I have patterns which look for the absence of an Event
> for
> > a fixed period of time.
> >
> > For eg. We have many such patterns:
> >
> > Pattern<Event, Event> pattern = Pattern.<Event>begin
> >        (UNDER_CHILLED_ALERT, AfterMatchSkipStrategy
> >                .skipPastLastEvent())
> >        .where(Conditions.getUnderchilledCondition())
> >        .notFollowedBy(COMPRESSOR_ON)
> >        .where(Conditions.getCompressorOnCondition())
> >        .within(Time.minutes(30))
> >        .followedBy(HIGH_TEMP)
> >        .where(Conditions.getHighTemperatureCondition());
> >
> > Now when there are network issues (which are very frequent), queued
> events
> > are delivered together, and such patterns will not be matched correctly
> as
> > pruning of events from NFA's buffer will not be done based on the
> timestamp
> > within the event, but on the watermark received by the operator.
> >
> > Is my understanding here correct?
> >
> > Thanks,
> > Shailesh
> >
> > On Mon, Mar 19, 2018 at 3:17 PM, Dawid Wysakowicz <
> > wysakowicz.dawid@gmail.com> wrote:
> >
> >> Hi Shailesh,
> >>
> >> Thanks for your interest in the CEP library and sorry for late
> response. I
> >> must say I am not fun of this approach.
> >> After this change, the Processing time is no longer a processing time,
> >> plus it will work differently in any other place of Flink. It will also
> not
> >> sort the events etc.
> >> Moreover I think you could achieve pretty similar solution if you
> generate
> >> your watermark based on the machine time. If in the getCurrentWatermark
> >> method
> >> of your AssignerWithPeriodicWatermarks you will just return new
> >> Watermark(System.currentTimeMillis()), you will get the same behaviour
> as
> >> with that change, am I right?
> >>
> >> Best,
> >> Dawid
> >>
> >>> On 18 Mar 2018, at 09:00, Shailesh Jain <sh...@stellapps.com>
> >> wrote:
> >>>
> >>> Thanks Aljoscha.
> >>>
> >>> Bump.
> >>>
> >>> I understand everyone would be busy with 1.5.0, but would really
> >> appreciate
> >>> slight help in unblocking us here.
> >>>
> >>> Thanks,
> >>> Shailesh
> >>>
> >>> On Thu, Mar 15, 2018 at 1:47 AM, Aljoscha Krettek <aljoscha@apache.org
> >
> >>> wrote:
> >>>
> >>>> Hi,
> >>>>
> >>>> I think this should have been sent to the dev mailing list because in
> >> the
> >>>> user mailing list it might disappear among a lot of other mail.
> >>>>
> >>>> Forwarding...
> >>>>
> >>>> Best,
> >>>> Aljoscha
> >>>>
> >>>>> On 14. Mar 2018, at 06:20, Shailesh Jain <
> shailesh.jain@stellapps.com>
> >>>> wrote:
> >>>>>
> >>>>> Hi,
> >>>>>
> >>>>> We've been facing issues* w.r.t watermarks not supported per key,
> which
> >>>> led us to:
> >>>>>
> >>>>> Either (a) run the job in Processing time for a KeyedStream ->
> >>>> compromising on use cases which revolve around catching time-based
> >> patterns
> >>>>> or (b) run the job in Event time for multiple data streams (one data
> >>>> stream per key) -> this is not scalable as the number of operators
> grow
> >>>> linearly with the number of keys
> >>>>>
> >>>>> To address this, we've done a quick (poc) change in the
> >>>> AbstractKeyedCEPPatternOperator to allow for the NFAs to progress
> based
> >>>> on timestamps extracted from the events arriving into the operator
> (and
> >> not
> >>>> from the watermarks). We've tested it against our usecase and are
> >> seeing a
> >>>> significant improvement in memory usage without compromising on the
> >>>> watermark functionality.
> >>>>>
> >>>>> It'll be really helpful if someone from the cep dev group can take a
> >>>> look at this branch - https://github.com/jainshailesh/flink/commits/
> >>>> cep_changes <
> https://github.com/jainshailesh/flink/commits/cep_changes>
> >>>> and provide comments on the approach taken, and maybe guide us on the
> >> next
> >>>> steps for taking it forward.
> >>>>>
> >>>>> Thanks,
> >>>>> Shailesh
> >>>>>
> >>>>> * Links to previous email threads related to the same issue:
> >>>>> http://apache-flink-user-mailing-list-archive.2336050.
> >>>> n4.nabble.com/Question-on-event-time-functionality-
> >>>> using-Flink-in-a-IoT-usecase-td18653.html <http://apache-flink-user-
> >>>> mailing-list-archive.2336050.n4.nabble.com/Question-on-
> >>>> event-time-functionality-using-Flink-in-a-IoT-usecase-td18653.html>
> >>>>> http://apache-flink-user-mailing-list-archive.2336050.
> >>>>
> n4.nabble.com/Generate-watermarks-per-key-in-a-KeyedStream-td16629.html
> >> <
> >>>> http://apache-flink-user-mailing-list-archive.2336050.
> >>>>
> n4.nabble.com/Generate-watermarks-per-key-in-a-KeyedStream-td16629.html
> >>>
> >>>>> http://apache-flink-user-mailing-list-archive.2336050.
> >>>> n4.nabble.com/Correlation-between-number-of-operators-
> >>>> and-Job-manager-memory-requirements-td18384.html <
> >>>> http://apache-flink-user-mailing-list-archive.2336050.
> >>>> n4.nabble.com/Correlation-between-number-of-operators-
> >>>> and-Job-manager-memory-requirements-td18384.html>
> >>>>>
> >>>>
> >>>>
> >>
> >>
>
>

Re: [Proposal] CEP library changes - review request

Posted by Dawid Wysakowicz <wy...@gmail.com>.
If you do the buffering you can emit watermark for each such batch (equal to highest timestamp in such batch). This way you won’t need to sort. CEP library will do it for you.
The within clause will work in EventTime then.

One more remark also the within clause always work for whole pattern not just to a part of it, it does not matter if you apply it in the middle (as you did) or at the very end.

Best,
Dawid

> On 19 Mar 2018, at 11:31, Shailesh Jain <sh...@stellapps.com> wrote:
> 
> Thanks for your reply, Dawid.
> 
> I understand that the approach I've tried out is not generic enough, and
> would need a lot more thought to be put into w.r.t parallelism
> considerations, out of order events, effects on downstream operators etc.
> The intention was to do a quick implementation to check the feasibility of
> the approach.
> 
>>> It will also not sort the events etc.
> 
> In the application code to test this approach, I had used a Global window
> to sort events based on their timestamp (similar to how out of order events
> are dropped based on a time-bound, I'm dropping them based on a count based
> bound).
> 
> allEvents = allEvents
>        .keyBy(event -> event.getAttributeValue(EVENT_SOURCE_ID))
>        .window(GlobalWindows.create())
>        .trigger(new GlobalWindowCountTrigger(propLoader.getSortWindowSize()))
>        .process(new SortWindowProcessFunction())
>        .keyBy(event -> event.getAttributeValue(EVENT_SOURCE_ID))
>        .assignTimestampsAndWatermarks(new TimestampsExtractor())
>        .uid(Constants.TS_EX_UID);
> PatternLoader
>        .applyPatterns(allEvents, propLoader.getPatternClassNames())
>        .addSink(createKafkaSink(kafkaProps))
>        .uid(Constants.KAFKA_SINK_COMPLEX_EVENTS_UID);
> 
> 
>>> If in the getCurrentWatermark method of your
> AssignerWithPeriodicWatermarks you will just return
>>> new Watermark(System.currentTimeMillis()), you will get the same
> behaviour as with that change,
>>> am I right?
> 
> If watermarks are generated based on the machine time, the major issue I
> see is that we will not be able to leverage Event Time functionality.
> Specifically, if I have patterns which look for the absence of an Event for
> a fixed period of time.
> 
> For eg. We have many such patterns:
> 
> Pattern<Event, Event> pattern = Pattern.<Event>begin
>        (UNDER_CHILLED_ALERT, AfterMatchSkipStrategy
>                .skipPastLastEvent())
>        .where(Conditions.getUnderchilledCondition())
>        .notFollowedBy(COMPRESSOR_ON)
>        .where(Conditions.getCompressorOnCondition())
>        .within(Time.minutes(30))
>        .followedBy(HIGH_TEMP)
>        .where(Conditions.getHighTemperatureCondition());
> 
> Now when there are network issues (which are very frequent), queued events
> are delivered together, and such patterns will not be matched correctly as
> pruning of events from NFA's buffer will not be done based on the timestamp
> within the event, but on the watermark received by the operator.
> 
> Is my understanding here correct?
> 
> Thanks,
> Shailesh
> 
> On Mon, Mar 19, 2018 at 3:17 PM, Dawid Wysakowicz <
> wysakowicz.dawid@gmail.com> wrote:
> 
>> Hi Shailesh,
>> 
>> Thanks for your interest in the CEP library and sorry for late response. I
>> must say I am not fun of this approach.
>> After this change, the Processing time is no longer a processing time,
>> plus it will work differently in any other place of Flink. It will also not
>> sort the events etc.
>> Moreover I think you could achieve pretty similar solution if you generate
>> your watermark based on the machine time. If in the getCurrentWatermark
>> method
>> of your AssignerWithPeriodicWatermarks you will just return new
>> Watermark(System.currentTimeMillis()), you will get the same behaviour as
>> with that change, am I right?
>> 
>> Best,
>> Dawid
>> 
>>> On 18 Mar 2018, at 09:00, Shailesh Jain <sh...@stellapps.com>
>> wrote:
>>> 
>>> Thanks Aljoscha.
>>> 
>>> Bump.
>>> 
>>> I understand everyone would be busy with 1.5.0, but would really
>> appreciate
>>> slight help in unblocking us here.
>>> 
>>> Thanks,
>>> Shailesh
>>> 
>>> On Thu, Mar 15, 2018 at 1:47 AM, Aljoscha Krettek <al...@apache.org>
>>> wrote:
>>> 
>>>> Hi,
>>>> 
>>>> I think this should have been sent to the dev mailing list because in
>> the
>>>> user mailing list it might disappear among a lot of other mail.
>>>> 
>>>> Forwarding...
>>>> 
>>>> Best,
>>>> Aljoscha
>>>> 
>>>>> On 14. Mar 2018, at 06:20, Shailesh Jain <sh...@stellapps.com>
>>>> wrote:
>>>>> 
>>>>> Hi,
>>>>> 
>>>>> We've been facing issues* w.r.t watermarks not supported per key, which
>>>> led us to:
>>>>> 
>>>>> Either (a) run the job in Processing time for a KeyedStream ->
>>>> compromising on use cases which revolve around catching time-based
>> patterns
>>>>> or (b) run the job in Event time for multiple data streams (one data
>>>> stream per key) -> this is not scalable as the number of operators grow
>>>> linearly with the number of keys
>>>>> 
>>>>> To address this, we've done a quick (poc) change in the
>>>> AbstractKeyedCEPPatternOperator to allow for the NFAs to progress based
>>>> on timestamps extracted from the events arriving into the operator (and
>> not
>>>> from the watermarks). We've tested it against our usecase and are
>> seeing a
>>>> significant improvement in memory usage without compromising on the
>>>> watermark functionality.
>>>>> 
>>>>> It'll be really helpful if someone from the cep dev group can take a
>>>> look at this branch - https://github.com/jainshailesh/flink/commits/
>>>> cep_changes <https://github.com/jainshailesh/flink/commits/cep_changes>
>>>> and provide comments on the approach taken, and maybe guide us on the
>> next
>>>> steps for taking it forward.
>>>>> 
>>>>> Thanks,
>>>>> Shailesh
>>>>> 
>>>>> * Links to previous email threads related to the same issue:
>>>>> http://apache-flink-user-mailing-list-archive.2336050.
>>>> n4.nabble.com/Question-on-event-time-functionality-
>>>> using-Flink-in-a-IoT-usecase-td18653.html <http://apache-flink-user-
>>>> mailing-list-archive.2336050.n4.nabble.com/Question-on-
>>>> event-time-functionality-using-Flink-in-a-IoT-usecase-td18653.html>
>>>>> http://apache-flink-user-mailing-list-archive.2336050.
>>>> n4.nabble.com/Generate-watermarks-per-key-in-a-KeyedStream-td16629.html
>> <
>>>> http://apache-flink-user-mailing-list-archive.2336050.
>>>> n4.nabble.com/Generate-watermarks-per-key-in-a-KeyedStream-td16629.html
>>> 
>>>>> http://apache-flink-user-mailing-list-archive.2336050.
>>>> n4.nabble.com/Correlation-between-number-of-operators-
>>>> and-Job-manager-memory-requirements-td18384.html <
>>>> http://apache-flink-user-mailing-list-archive.2336050.
>>>> n4.nabble.com/Correlation-between-number-of-operators-
>>>> and-Job-manager-memory-requirements-td18384.html>
>>>>> 
>>>> 
>>>> 
>> 
>> 


Re: [Proposal] CEP library changes - review request

Posted by Shailesh Jain <sh...@stellapps.com>.
Thanks for your reply, Dawid.

I understand that the approach I've tried out is not generic enough, and
would need a lot more thought to be put into w.r.t parallelism
considerations, out of order events, effects on downstream operators etc.
The intention was to do a quick implementation to check the feasibility of
the approach.

>> It will also not sort the events etc.

In the application code to test this approach, I had used a Global window
to sort events based on their timestamp (similar to how out of order events
are dropped based on a time-bound, I'm dropping them based on a count based
bound).

allEvents = allEvents
        .keyBy(event -> event.getAttributeValue(EVENT_SOURCE_ID))
        .window(GlobalWindows.create())
        .trigger(new GlobalWindowCountTrigger(propLoader.getSortWindowSize()))
        .process(new SortWindowProcessFunction())
        .keyBy(event -> event.getAttributeValue(EVENT_SOURCE_ID))
        .assignTimestampsAndWatermarks(new TimestampsExtractor())
        .uid(Constants.TS_EX_UID);
PatternLoader
        .applyPatterns(allEvents, propLoader.getPatternClassNames())
        .addSink(createKafkaSink(kafkaProps))
        .uid(Constants.KAFKA_SINK_COMPLEX_EVENTS_UID);


>> If in the getCurrentWatermark method of your
AssignerWithPeriodicWatermarks you will just return
>> new Watermark(System.currentTimeMillis()), you will get the same
behaviour as with that change,
>> am I right?

If watermarks are generated based on the machine time, the major issue I
see is that we will not be able to leverage Event Time functionality.
Specifically, if I have patterns which look for the absence of an Event for
a fixed period of time.

For eg. We have many such patterns:

Pattern<Event, Event> pattern = Pattern.<Event>begin
        (UNDER_CHILLED_ALERT, AfterMatchSkipStrategy
                .skipPastLastEvent())
        .where(Conditions.getUnderchilledCondition())
        .notFollowedBy(COMPRESSOR_ON)
        .where(Conditions.getCompressorOnCondition())
        .within(Time.minutes(30))
        .followedBy(HIGH_TEMP)
        .where(Conditions.getHighTemperatureCondition());

Now when there are network issues (which are very frequent), queued events
are delivered together, and such patterns will not be matched correctly as
pruning of events from NFA's buffer will not be done based on the timestamp
within the event, but on the watermark received by the operator.

Is my understanding here correct?

Thanks,
Shailesh

On Mon, Mar 19, 2018 at 3:17 PM, Dawid Wysakowicz <
wysakowicz.dawid@gmail.com> wrote:

> Hi Shailesh,
>
> Thanks for your interest in the CEP library and sorry for late response. I
> must say I am not fun of this approach.
> After this change, the Processing time is no longer a processing time,
> plus it will work differently in any other place of Flink. It will also not
> sort the events etc.
> Moreover I think you could achieve pretty similar solution if you generate
> your watermark based on the machine time. If in the getCurrentWatermark
> method
> of your AssignerWithPeriodicWatermarks you will just return new
> Watermark(System.currentTimeMillis()), you will get the same behaviour as
> with that change, am I right?
>
> Best,
> Dawid
>
> > On 18 Mar 2018, at 09:00, Shailesh Jain <sh...@stellapps.com>
> wrote:
> >
> > Thanks Aljoscha.
> >
> > Bump.
> >
> > I understand everyone would be busy with 1.5.0, but would really
> appreciate
> > slight help in unblocking us here.
> >
> > Thanks,
> > Shailesh
> >
> > On Thu, Mar 15, 2018 at 1:47 AM, Aljoscha Krettek <al...@apache.org>
> > wrote:
> >
> >> Hi,
> >>
> >> I think this should have been sent to the dev mailing list because in
> the
> >> user mailing list it might disappear among a lot of other mail.
> >>
> >> Forwarding...
> >>
> >> Best,
> >> Aljoscha
> >>
> >>> On 14. Mar 2018, at 06:20, Shailesh Jain <sh...@stellapps.com>
> >> wrote:
> >>>
> >>> Hi,
> >>>
> >>> We've been facing issues* w.r.t watermarks not supported per key, which
> >> led us to:
> >>>
> >>> Either (a) run the job in Processing time for a KeyedStream ->
> >> compromising on use cases which revolve around catching time-based
> patterns
> >>> or (b) run the job in Event time for multiple data streams (one data
> >> stream per key) -> this is not scalable as the number of operators grow
> >> linearly with the number of keys
> >>>
> >>> To address this, we've done a quick (poc) change in the
> >> AbstractKeyedCEPPatternOperator to allow for the NFAs to progress based
> >> on timestamps extracted from the events arriving into the operator (and
> not
> >> from the watermarks). We've tested it against our usecase and are
> seeing a
> >> significant improvement in memory usage without compromising on the
> >> watermark functionality.
> >>>
> >>> It'll be really helpful if someone from the cep dev group can take a
> >> look at this branch - https://github.com/jainshailesh/flink/commits/
> >> cep_changes <https://github.com/jainshailesh/flink/commits/cep_changes>
> >> and provide comments on the approach taken, and maybe guide us on the
> next
> >> steps for taking it forward.
> >>>
> >>> Thanks,
> >>> Shailesh
> >>>
> >>> * Links to previous email threads related to the same issue:
> >>> http://apache-flink-user-mailing-list-archive.2336050.
> >> n4.nabble.com/Question-on-event-time-functionality-
> >> using-Flink-in-a-IoT-usecase-td18653.html <http://apache-flink-user-
> >> mailing-list-archive.2336050.n4.nabble.com/Question-on-
> >> event-time-functionality-using-Flink-in-a-IoT-usecase-td18653.html>
> >>> http://apache-flink-user-mailing-list-archive.2336050.
> >> n4.nabble.com/Generate-watermarks-per-key-in-a-KeyedStream-td16629.html
> <
> >> http://apache-flink-user-mailing-list-archive.2336050.
> >> n4.nabble.com/Generate-watermarks-per-key-in-a-KeyedStream-td16629.html
> >
> >>> http://apache-flink-user-mailing-list-archive.2336050.
> >> n4.nabble.com/Correlation-between-number-of-operators-
> >> and-Job-manager-memory-requirements-td18384.html <
> >> http://apache-flink-user-mailing-list-archive.2336050.
> >> n4.nabble.com/Correlation-between-number-of-operators-
> >> and-Job-manager-memory-requirements-td18384.html>
> >>>
> >>
> >>
>
>

Re: [Proposal] CEP library changes - review request

Posted by Dawid Wysakowicz <wy...@gmail.com>.
Hi Shailesh,

Thanks for your interest in the CEP library and sorry for late response. I must say I am not fun of this approach.
After this change, the Processing time is no longer a processing time, plus it will work differently in any other place of Flink. It will also not sort the events etc.
Moreover I think you could achieve pretty similar solution if you generate your watermark based on the machine time. If in the getCurrentWatermark method
of your AssignerWithPeriodicWatermarks you will just return new Watermark(System.currentTimeMillis()), you will get the same behaviour as with that change, am I right?

Best,
Dawid

> On 18 Mar 2018, at 09:00, Shailesh Jain <sh...@stellapps.com> wrote:
> 
> Thanks Aljoscha.
> 
> Bump.
> 
> I understand everyone would be busy with 1.5.0, but would really appreciate
> slight help in unblocking us here.
> 
> Thanks,
> Shailesh
> 
> On Thu, Mar 15, 2018 at 1:47 AM, Aljoscha Krettek <al...@apache.org>
> wrote:
> 
>> Hi,
>> 
>> I think this should have been sent to the dev mailing list because in the
>> user mailing list it might disappear among a lot of other mail.
>> 
>> Forwarding...
>> 
>> Best,
>> Aljoscha
>> 
>>> On 14. Mar 2018, at 06:20, Shailesh Jain <sh...@stellapps.com>
>> wrote:
>>> 
>>> Hi,
>>> 
>>> We've been facing issues* w.r.t watermarks not supported per key, which
>> led us to:
>>> 
>>> Either (a) run the job in Processing time for a KeyedStream ->
>> compromising on use cases which revolve around catching time-based patterns
>>> or (b) run the job in Event time for multiple data streams (one data
>> stream per key) -> this is not scalable as the number of operators grow
>> linearly with the number of keys
>>> 
>>> To address this, we've done a quick (poc) change in the
>> AbstractKeyedCEPPatternOperator to allow for the NFAs to progress based
>> on timestamps extracted from the events arriving into the operator (and not
>> from the watermarks). We've tested it against our usecase and are seeing a
>> significant improvement in memory usage without compromising on the
>> watermark functionality.
>>> 
>>> It'll be really helpful if someone from the cep dev group can take a
>> look at this branch - https://github.com/jainshailesh/flink/commits/
>> cep_changes <https://github.com/jainshailesh/flink/commits/cep_changes>
>> and provide comments on the approach taken, and maybe guide us on the next
>> steps for taking it forward.
>>> 
>>> Thanks,
>>> Shailesh
>>> 
>>> * Links to previous email threads related to the same issue:
>>> http://apache-flink-user-mailing-list-archive.2336050.
>> n4.nabble.com/Question-on-event-time-functionality-
>> using-Flink-in-a-IoT-usecase-td18653.html <http://apache-flink-user-
>> mailing-list-archive.2336050.n4.nabble.com/Question-on-
>> event-time-functionality-using-Flink-in-a-IoT-usecase-td18653.html>
>>> http://apache-flink-user-mailing-list-archive.2336050.
>> n4.nabble.com/Generate-watermarks-per-key-in-a-KeyedStream-td16629.html <
>> http://apache-flink-user-mailing-list-archive.2336050.
>> n4.nabble.com/Generate-watermarks-per-key-in-a-KeyedStream-td16629.html>
>>> http://apache-flink-user-mailing-list-archive.2336050.
>> n4.nabble.com/Correlation-between-number-of-operators-
>> and-Job-manager-memory-requirements-td18384.html <
>> http://apache-flink-user-mailing-list-archive.2336050.
>> n4.nabble.com/Correlation-between-number-of-operators-
>> and-Job-manager-memory-requirements-td18384.html>
>>> 
>> 
>> 


Re: [Proposal] CEP library changes - review request

Posted by Shailesh Jain <sh...@stellapps.com>.
Thanks Aljoscha.

Bump.

I understand everyone would be busy with 1.5.0, but would really appreciate
slight help in unblocking us here.

Thanks,
Shailesh

On Thu, Mar 15, 2018 at 1:47 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
>
> I think this should have been sent to the dev mailing list because in the
> user mailing list it might disappear among a lot of other mail.
>
> Forwarding...
>
> Best,
> Aljoscha
>
> > On 14. Mar 2018, at 06:20, Shailesh Jain <sh...@stellapps.com>
> wrote:
> >
> > Hi,
> >
> > We've been facing issues* w.r.t watermarks not supported per key, which
> led us to:
> >
> > Either (a) run the job in Processing time for a KeyedStream ->
> compromising on use cases which revolve around catching time-based patterns
> > or (b) run the job in Event time for multiple data streams (one data
> stream per key) -> this is not scalable as the number of operators grow
> linearly with the number of keys
> >
> > To address this, we've done a quick (poc) change in the
> AbstractKeyedCEPPatternOperator to allow for the NFAs to progress based
> on timestamps extracted from the events arriving into the operator (and not
> from the watermarks). We've tested it against our usecase and are seeing a
> significant improvement in memory usage without compromising on the
> watermark functionality.
> >
> > It'll be really helpful if someone from the cep dev group can take a
> look at this branch - https://github.com/jainshailesh/flink/commits/
> cep_changes <https://github.com/jainshailesh/flink/commits/cep_changes>
> and provide comments on the approach taken, and maybe guide us on the next
> steps for taking it forward.
> >
> > Thanks,
> > Shailesh
> >
> > * Links to previous email threads related to the same issue:
> > http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Question-on-event-time-functionality-
> using-Flink-in-a-IoT-usecase-td18653.html <http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Question-on-
> event-time-functionality-using-Flink-in-a-IoT-usecase-td18653.html>
> > http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Generate-watermarks-per-key-in-a-KeyedStream-td16629.html <
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Generate-watermarks-per-key-in-a-KeyedStream-td16629.html>
> > http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Correlation-between-number-of-operators-
> and-Job-manager-memory-requirements-td18384.html <
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Correlation-between-number-of-operators-
> and-Job-manager-memory-requirements-td18384.html>
> >
>
>

Re: [Proposal] CEP library changes - review request

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

I think this should have been sent to the dev mailing list because in the user mailing list it might disappear among a lot of other mail.

Forwarding...

Best,
Aljoscha

> On 14. Mar 2018, at 06:20, Shailesh Jain <sh...@stellapps.com> wrote:
> 
> Hi,
> 
> We've been facing issues* w.r.t watermarks not supported per key, which led us to:
> 
> Either (a) run the job in Processing time for a KeyedStream -> compromising on use cases which revolve around catching time-based patterns
> or (b) run the job in Event time for multiple data streams (one data stream per key) -> this is not scalable as the number of operators grow linearly with the number of keys
> 
> To address this, we've done a quick (poc) change in the AbstractKeyedCEPPatternOperator to allow for the NFAs to progress based on timestamps extracted from the events arriving into the operator (and not from the watermarks). We've tested it against our usecase and are seeing a significant improvement in memory usage without compromising on the watermark functionality.
> 
> It'll be really helpful if someone from the cep dev group can take a look at this branch - https://github.com/jainshailesh/flink/commits/cep_changes <https://github.com/jainshailesh/flink/commits/cep_changes> and provide comments on the approach taken, and maybe guide us on the next steps for taking it forward. 
> 
> Thanks,
> Shailesh
> 
> * Links to previous email threads related to the same issue:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Question-on-event-time-functionality-using-Flink-in-a-IoT-usecase-td18653.html <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Question-on-event-time-functionality-using-Flink-in-a-IoT-usecase-td18653.html>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Generate-watermarks-per-key-in-a-KeyedStream-td16629.html <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Generate-watermarks-per-key-in-a-KeyedStream-td16629.html>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Correlation-between-number-of-operators-and-Job-manager-memory-requirements-td18384.html <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Correlation-between-number-of-operators-and-Job-manager-memory-requirements-td18384.html>
>