You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by David Koch <og...@googlemail.com> on 2016/10/06 13:08:19 UTC

Listening to timed-out patterns in Flink CEP

Hello,

With Flink CEP, is there a way to actively listen to pattern matches that
time out? I am under the impression that this is not possible.

In my case I partition a stream containing user web navigation by "userId"
to look for sequences of Event A, followed by B within 4 seconds for each
user.

I registered a PatternTimeoutFunction which assuming a non-match only fires
upon the first event after the specified timeout. For example, given user
X: Event A, 20 seconds later Event B (or any other type of event).

I'd rather have a notification fire directly upon the 4 second interval
expiring since passive invalidation is not really applicable in my case.

How, if at all can this be achieved with Flink CEP?

Thanks,

David

Re: Listening to timed-out patterns in Flink CEP

Posted by Till Rohrmann <tr...@apache.org>.
Great to hear that things are now working :-)

On Sun, Jun 11, 2017 at 11:19 PM, David Koch <og...@googlemail.com> wrote:

> Hello,
>
> It's been a while and I have never replied on the list. In fact, the fix
> committed by Till does work. Thanks!
>
> On Tue, Apr 25, 2017 at 9:37 AM, Moiz Jinia <mo...@gmail.com> wrote:
>
>> Hey David,
>> Did that work for you? If yes could you share an example. I have a similar
>> use case - need to get notified of an event NOT occurring within a
>> specified
>> time window.
>>
>> Thanks much!
>>
>> Moiz
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/Listening-to-timed-out
>> -patterns-in-Flink-CEP-tp9371p12800.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>

Re: Listening to timed-out patterns in Flink CEP

Posted by David Koch <og...@googlemail.com>.
Hello,

It's been a while and I have never replied on the list. In fact, the fix
committed by Till does work. Thanks!

On Tue, Apr 25, 2017 at 9:37 AM, Moiz Jinia <mo...@gmail.com> wrote:

> Hey David,
> Did that work for you? If yes could you share an example. I have a similar
> use case - need to get notified of an event NOT occurring within a
> specified
> time window.
>
> Thanks much!
>
> Moiz
>
>
>
> --
> View this message in context: http://apache-flink-user-maili
> ng-list-archive.2336050.n4.nabble.com/Listening-to-timed-
> out-patterns-in-Flink-CEP-tp9371p12800.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: Listening to timed-out patterns in Flink CEP

Posted by Moiz Jinia <mo...@gmail.com>.
Hey David,
Did that work for you? If yes could you share an example. I have a similar
use case - need to get notified of an event NOT occurring within a specified
time window.

Thanks much!

Moiz



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Listening-to-timed-out-patterns-in-Flink-CEP-tp9371p12800.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Listening to timed-out patterns in Flink CEP

Posted by David Koch <og...@googlemail.com>.
Hi Till,

Excellent - I'll check out the current snapshot version! Thank you for
taking the time to look into this.

Regards,

David

On Tue, Nov 8, 2016 at 3:25 PM, Till Rohrmann <tr...@apache.org> wrote:

> Hi David,
>
> sorry for my late reply. I just found time to look into the problem. You
> were right with your observation that the CEP operator did not behave as
> I've described it. The problem was that the time of the underlying NFA was
> not advanced if there were no events buffered in the CEP operator when a
> new watermark arrived. This was not intended and I opened a PR [1] to fix
> this problem. I've tested the fix with your example program and it seems to
> solve the problem that you don't see timeouts after the timeout interval
> has passed. Thanks for reporting this problem and please excuse my long
> response time.
>
> Btw, I'll merge the PR this evening. So it should be included in the
> current snapshot version by the end of tomorrow.
>
> [1] https://github.com/apache/flink/pull/2771
>
> Cheers,
> Till
>
> On Fri, Oct 14, 2016 at 11:40 AM, Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Hi guys,
>>
>> I'll try to come up with an example illustrating the behaviour over the
>> weekend.
>>
>> Cheers,
>> Till
>>
>> On Fri, Oct 14, 2016 at 11:16 AM, David Koch <og...@googlemail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> Thanks for the code Sameer. Unfortunately, it didn't solve the issue.
>>> Compared to what I did the principle is the same - make sure that the
>>> watermark advances even without events present to trigger timeouts in CEP
>>> patterns.
>>>
>>> If Till or anyone else could provide a minimal example illustrating the
>>> supposed behaviour of:
>>>
>>> [CEP] timeout will be detected when the first watermark exceeding the
>>>> timeout value is received
>>>
>>>
>>> I'd very much appreciate it.
>>>
>>> Regards,
>>>
>>> David
>>>
>>>
>>> On Wed, Oct 12, 2016 at 1:54 AM, Sameer W <sa...@axiomine.com> wrote:
>>>
>>>> Try this. Your WM's need to move forward. Also don't use System
>>>> Timestamp. Use the timestamp of the element seen as the reference as the
>>>> elements are most likely lagging the system timestamp.
>>>>
>>>> DataStream<Event> withTimestampsAndWatermarks = tuples
>>>>         .assignTimestampsAndWatermarks(new
>>>> AssignerWithPeriodicWatermarks<Event>() {
>>>>
>>>>             long waterMarkTmst;
>>>>             long lastEmittedWM=0;
>>>>             @Override
>>>>             public long extractTimestamp(Event element, long
>>>> previousElementTimestamp) {
>>>>                 if(element.tmst>lastEmittedWM){
>>>>                    waterMarkTmst = element.tmst-1; //Assumes
>>>> increasing timestamps. Need to subtract 1 as more elements with same TS
>>>> might arrive
>>>>                 }
>>>>                 return element.tmst;
>>>>             }
>>>>
>>>>             @Override
>>>>             public Watermark getCurrentWatermark() {
>>>>                 if(lastEmittedWM==waterMarkTmst){ //No new event seen,
>>>> move the WM forward by auto watermark interval
>>>>                     waterMarkTmst = waterMarkTmst + 1000l//Increase by
>>>> auto watermark interval (Watermarks only move forward in time)
>>>>                 }
>>>>                 lastEmittedWM = waterMarkTmst
>>>>
>>>>                 System.out.println(String.format("Watermark at %s",
>>>> new Date(waterMarkTmst)));
>>>>                 return new Watermark(waterMarkTmst);//Until an event
>>>> is seem WM==0 starts advancing by 1000ms until an event is seen
>>>>             }
>>>>         }).keyBy("key");
>>>>
>>>> On Tue, Oct 11, 2016 at 7:29 PM, David Koch <og...@googlemail.com>
>>>> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I tried setting the watermark to System.currentTimeMillis() - 5000L,
>>>>> event timestamps are System.currentTimeMillis(). I do not observe the
>>>>> expected behaviour of the PatternTimeoutFunction firing once the watermark
>>>>> moves past the timeout "anchored" by a pattern match.
>>>>>
>>>>> Here is the complete test class source <http://pastebin.com/9WxGq2wv>,
>>>>> in case someone is interested. The timestamp/watermark assigner looks like
>>>>> this:
>>>>>
>>>>> DataStream<Event> withTimestampsAndWatermarks = tuples
>>>>>         .assignTimestampsAndWatermarks(new
>>>>> AssignerWithPeriodicWatermarks<Event>() {
>>>>>
>>>>>             long waterMarkTmst;
>>>>>
>>>>>             @Override
>>>>>             public long extractTimestamp(Event element, long
>>>>> previousElementTimestamp) {
>>>>>                 return element.tmst;
>>>>>             }
>>>>>
>>>>>             @Override
>>>>>             public Watermark getCurrentWatermark() {
>>>>>                 waterMarkTmst = System.currentTimeMillis() - 5000L;
>>>>>                 System.out.println(String.format("Watermark at %s",
>>>>> new Date(waterMarkTmst)));
>>>>>                 return new Watermark(waterMarkTmst);
>>>>>             }
>>>>>         }).keyBy("key");
>>>>>
>>>>> withTimestampsAndWatermarks.getExecutionConfig().setAutoWate
>>>>> rmarkInterval(1000L);
>>>>>
>>>>> // Apply pattern filtering on stream.
>>>>> PatternStream<Event> patternStream = CEP.pattern(withTimestampsAndWatermarks,
>>>>> pattern);
>>>>>
>>>>> Any idea what's wrong?
>>>>>
>>>>> David
>>>>>
>>>>>
>>>>> On Tue, Oct 11, 2016 at 10:20 PM, Sameer W <sa...@axiomine.com>
>>>>> wrote:
>>>>>
>>>>>> Assuming an element with timestamp which is later than the last
>>>>>> emitted watermark arrives, would it just be dropped because the
>>>>>> PatternStream does not have a max allowed lateness method? In that case it
>>>>>> appears that CEP cannot handle late events yet out of the box.
>>>>>>
>>>>>> If we do want to support late events can we chain a
>>>>>> keyBy().timeWindow().allowedLateness(x).map().assignTimestampsAndWatermarks().keyBy()
>>>>>> again before handing it to the CEP operator. This way we may have the
>>>>>> patterns fired multiple times but it allows an event to be late and out of
>>>>>> order. It looks like it will work but is there a less convoluted way.
>>>>>>
>>>>>> Thanks,
>>>>>> Sameer
>>>>>>
>>>>>> On Tue, Oct 11, 2016 at 12:17 PM, Till Rohrmann <
>>>>>> till.rohrmann@gmail.com> wrote:
>>>>>>
>>>>>>> But then no element later than the last emitted watermark must be
>>>>>>> issued by the sources. If that is the case, then this solution should work.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>>
>>>>>>> On Tue, Oct 11, 2016 at 4:50 PM, Sameer W <sa...@axiomine.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> If you know that the events are arriving in order and a consistent
>>>>>>>> lag, why not just increment the watermark time every time the
>>>>>>>> getCurrentWatermark() method is invoked based on the autoWatermarkInterval
>>>>>>>> (or less to be conservative).
>>>>>>>>
>>>>>>>> You can check if the watermark has changed since the arrival of the
>>>>>>>> last event and if not increment it in the getCurrentWatermark() method.
>>>>>>>> Otherwise the watermark will never increase until an element arrive and if
>>>>>>>> the stream partition stalls for some reason the whole pipeline freezes.
>>>>>>>>
>>>>>>>> Sameer
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Oct 11, 2016 at 6:04 AM, Till Rohrmann <
>>>>>>>> till.rohrmann@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi David,
>>>>>>>>>
>>>>>>>>> the problem is still that there is no corresponding watermark
>>>>>>>>> saying that 4 seconds have now passed. With your code, watermarks will be
>>>>>>>>> periodically emitted but the same watermark will be emitted until a new
>>>>>>>>> element arrives which will reset the watermark. Thus, the system can never
>>>>>>>>> know until this watermark is seen whether there will be an earlier event or
>>>>>>>>> not. I fear that this is a fundamental problem with stream processing.
>>>>>>>>>
>>>>>>>>> You're right that the negation operator won't solve the problem.
>>>>>>>>> It will indeed suffer from the same problem.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Till
>>>>>>>>>
>>>>>>>>> On Sun, Oct 9, 2016 at 7:37 PM, <lg...@yahoo.com> wrote:
>>>>>>>>>
>>>>>>>>>> >>FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP
>>>>>>>>>> "not" operator) does not address this because again, how would the "not
>>>>>>>>>> match" be triggered if no event at all occurs?
>>>>>>>>>>
>>>>>>>>>> Good question.
>>>>>>>>>>
>>>>>>>>>> I'm not sure whether the following will work:
>>>>>>>>>>
>>>>>>>>>> This could be done by creating a CEP matching pattern that uses
>>>>>>>>>> both of "notNext" (or "notFollowedBy") and "within" constructs. Something
>>>>>>>>>> like this:
>>>>>>>>>>
>>>>>>>>>> Pattern<Event, ?> pattern = Pattern.<Event>begin("first")
>>>>>>>>>>     .notNext("second")
>>>>>>>>>>     .within(Time.seconds(3));
>>>>>>>>>>
>>>>>>>>>> I'm hoping Flink CEP experts (Till?) will comment on this.
>>>>>>>>>>
>>>>>>>>>> Note: I have requested these negation patterns to be implemented
>>>>>>>>>> in Flink CEP, but notNext/notFollowedBy are not yet implemented in Flink..
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> - LF
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> ------------------------------
>>>>>>>>>> *From:* David Koch <og...@googlemail.com>
>>>>>>>>>> *To:* user@flink.apache.org; lgfmt@yahoo.com
>>>>>>>>>> *Sent:* Sunday, October 9, 2016 5:51 AM
>>>>>>>>>>
>>>>>>>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>>>>>>>>
>>>>>>>>>> Hello,
>>>>>>>>>>
>>>>>>>>>> Thank you for the explanation as well as the link to the other
>>>>>>>>>> post. Interesting to learn about some of the open JIRAs.
>>>>>>>>>>
>>>>>>>>>> Indeed, I was not using event time, but processing time. However,
>>>>>>>>>> even when using event time I only get notified of timeouts upon subsequent
>>>>>>>>>> events.
>>>>>>>>>>
>>>>>>>>>> The link <http://pastebin.com/x4m3RHQz> contains an example
>>>>>>>>>> where I read <key> <value> from a socket, wrap this in a custom "event"
>>>>>>>>>> with timestamp, key the resultant stream by <key> and attempt to detect
>>>>>>>>>> <key> instances no further than 3 seconds apart using CEP.
>>>>>>>>>>
>>>>>>>>>> Apart from the fact that results are only printed when I close
>>>>>>>>>> the socket (normal?) I don't observe any change in behaviour
>>>>>>>>>>
>>>>>>>>>> So event-time/watermarks or not: SOME event has to occur for the
>>>>>>>>>> timeout to be triggered.
>>>>>>>>>>
>>>>>>>>>> FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP
>>>>>>>>>> "not" operator) does not address this because again, how would the "not
>>>>>>>>>> match" be triggered if no event at all occurs?
>>>>>>>>>>
>>>>>>>>>> On Sat, Oct 8, 2016 at 12:50 AM, <lg...@yahoo.com> wrote:
>>>>>>>>>>
>>>>>>>>>> The following is a better link:
>>>>>>>>>>
>>>>>>>>>> http://mail-archives.apache. org/mod_mbox/flink-user/
>>>>>>>>>> 201609.mbox/%3CCAC27z% 3DOTtv7USYUm82bE43- DkoGfVC4UAWD6uQwwRgTsE5be8g%
>>>>>>>>>> 40mail.gmail.com%3E
>>>>>>>>>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOTtv7USYUm82bE43-DkoGfVC4UAWD6uQwwRgTsE5be8g%40mail.gmail.com%3E>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> - LF
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> ------------------------------
>>>>>>>>>> *From:* "lgfmt@yahoo.com" <lg...@yahoo.com>
>>>>>>>>>> *To:* "user@flink.apache.org" <us...@flink.apache.org>
>>>>>>>>>> *Sent:* Friday, October 7, 2016 3:36 PM
>>>>>>>>>>
>>>>>>>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>>>>>>>>
>>>>>>>>>> Isn't the upcoming CEP negation (absence of an event) feature
>>>>>>>>>> solve this issue?
>>>>>>>>>>
>>>>>>>>>> See this discussion thread:
>>>>>>>>>> http://mail-archives.apache. org/mod_mbox/flink-user/
>>>>>>>>>> 201609.mbox/%3CCAC27z%3DOD% 2BTq8twBw_ 1YKni5sWAU3g1S9WDpJw0DUwgiG9YX
>>>>>>>>>> 9Fg%40mail.gmail.com%3E
>>>>>>>>>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOD%2BTq8twBw_1YKni5sWAU3g1S9WDpJw0DUwgiG9YX9Fg%40mail.gmail.com%3E>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> //  Atul
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> ------------------------------
>>>>>>>>>> *From:* Till Rohrmann <tr...@apache.org>
>>>>>>>>>> *To:* user@flink.apache.org
>>>>>>>>>> *Sent:* Friday, October 7, 2016 12:58 AM
>>>>>>>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>>>>>>>>
>>>>>>>>>> Hi David,
>>>>>>>>>>
>>>>>>>>>> in case of event time, the timeout will be detected when the
>>>>>>>>>> first watermark exceeding the timeout value is received. Thus, it depends a
>>>>>>>>>> little bit how you generate watermarks (e.g. periodically, watermark per
>>>>>>>>>> event).
>>>>>>>>>>
>>>>>>>>>> In case of processing time, the time is only updated whenever a
>>>>>>>>>> new element arrives. Thus, if you have an element arriving 4 seconds after
>>>>>>>>>> Event A, it should detect the timeout. If the next event arrives 20 seconds
>>>>>>>>>> later, than you won't see the timeout until then.
>>>>>>>>>>
>>>>>>>>>> In the case of processing time, we could think about registering
>>>>>>>>>> timeout timers for processing time. However, I would highly recommend you
>>>>>>>>>> to use event time, because with processing time, Flink cannot guarantee
>>>>>>>>>> meaningful computations, because the events might arrive out of order.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Till
>>>>>>>>>>
>>>>>>>>>> On Thu, Oct 6, 2016 at 3:08 PM, David Koch <ogdude@googlemail.com
>>>>>>>>>> > wrote:
>>>>>>>>>>
>>>>>>>>>> Hello,
>>>>>>>>>>
>>>>>>>>>> With Flink CEP, is there a way to actively listen to pattern
>>>>>>>>>> matches that time out? I am under the impression that this is not possible.
>>>>>>>>>>
>>>>>>>>>> In my case I partition a stream containing user web navigation by
>>>>>>>>>> "userId" to look for sequences of Event A, followed by B within 4 seconds
>>>>>>>>>> for each user.
>>>>>>>>>>
>>>>>>>>>> I registered a PatternTimeoutFunction which assuming a non-match
>>>>>>>>>> only fires upon the first event after the specified timeout. For example,
>>>>>>>>>> given user X: Event A, 20 seconds later Event B (or any other type of
>>>>>>>>>> event).
>>>>>>>>>>
>>>>>>>>>> I'd rather have a notification fire directly upon the 4 second
>>>>>>>>>> interval expiring since passive invalidation is not really applicable in my
>>>>>>>>>> case.
>>>>>>>>>>
>>>>>>>>>> How, if at all can this be achieved with Flink CEP?
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>>
>>>>>>>>>> David
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Listening to timed-out patterns in Flink CEP

Posted by Till Rohrmann <tr...@apache.org>.
Hi David,

sorry for my late reply. I just found time to look into the problem. You
were right with your observation that the CEP operator did not behave as
I've described it. The problem was that the time of the underlying NFA was
not advanced if there were no events buffered in the CEP operator when a
new watermark arrived. This was not intended and I opened a PR [1] to fix
this problem. I've tested the fix with your example program and it seems to
solve the problem that you don't see timeouts after the timeout interval
has passed. Thanks for reporting this problem and please excuse my long
response time.

Btw, I'll merge the PR this evening. So it should be included in the
current snapshot version by the end of tomorrow.

[1] https://github.com/apache/flink/pull/2771

Cheers,
Till

On Fri, Oct 14, 2016 at 11:40 AM, Till Rohrmann <tr...@apache.org>
wrote:

> Hi guys,
>
> I'll try to come up with an example illustrating the behaviour over the
> weekend.
>
> Cheers,
> Till
>
> On Fri, Oct 14, 2016 at 11:16 AM, David Koch <og...@googlemail.com>
> wrote:
>
>> Hello,
>>
>> Thanks for the code Sameer. Unfortunately, it didn't solve the issue.
>> Compared to what I did the principle is the same - make sure that the
>> watermark advances even without events present to trigger timeouts in CEP
>> patterns.
>>
>> If Till or anyone else could provide a minimal example illustrating the
>> supposed behaviour of:
>>
>> [CEP] timeout will be detected when the first watermark exceeding the
>>> timeout value is received
>>
>>
>> I'd very much appreciate it.
>>
>> Regards,
>>
>> David
>>
>>
>> On Wed, Oct 12, 2016 at 1:54 AM, Sameer W <sa...@axiomine.com> wrote:
>>
>>> Try this. Your WM's need to move forward. Also don't use System
>>> Timestamp. Use the timestamp of the element seen as the reference as the
>>> elements are most likely lagging the system timestamp.
>>>
>>> DataStream<Event> withTimestampsAndWatermarks = tuples
>>>         .assignTimestampsAndWatermarks(new
>>> AssignerWithPeriodicWatermarks<Event>() {
>>>
>>>             long waterMarkTmst;
>>>             long lastEmittedWM=0;
>>>             @Override
>>>             public long extractTimestamp(Event element, long
>>> previousElementTimestamp) {
>>>                 if(element.tmst>lastEmittedWM){
>>>                    waterMarkTmst = element.tmst-1; //Assumes increasing
>>> timestamps. Need to subtract 1 as more elements with same TS might arrive
>>>                 }
>>>                 return element.tmst;
>>>             }
>>>
>>>             @Override
>>>             public Watermark getCurrentWatermark() {
>>>                 if(lastEmittedWM==waterMarkTmst){ //No new event seen,
>>> move the WM forward by auto watermark interval
>>>                     waterMarkTmst = waterMarkTmst + 1000l//Increase by
>>> auto watermark interval (Watermarks only move forward in time)
>>>                 }
>>>                 lastEmittedWM = waterMarkTmst
>>>
>>>                 System.out.println(String.format("Watermark at %s", new
>>> Date(waterMarkTmst)));
>>>                 return new Watermark(waterMarkTmst);//Until an event is
>>> seem WM==0 starts advancing by 1000ms until an event is seen
>>>             }
>>>         }).keyBy("key");
>>>
>>> On Tue, Oct 11, 2016 at 7:29 PM, David Koch <og...@googlemail.com>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> I tried setting the watermark to System.currentTimeMillis() - 5000L,
>>>> event timestamps are System.currentTimeMillis(). I do not observe the
>>>> expected behaviour of the PatternTimeoutFunction firing once the watermark
>>>> moves past the timeout "anchored" by a pattern match.
>>>>
>>>> Here is the complete test class source <http://pastebin.com/9WxGq2wv>,
>>>> in case someone is interested. The timestamp/watermark assigner looks like
>>>> this:
>>>>
>>>> DataStream<Event> withTimestampsAndWatermarks = tuples
>>>>         .assignTimestampsAndWatermarks(new
>>>> AssignerWithPeriodicWatermarks<Event>() {
>>>>
>>>>             long waterMarkTmst;
>>>>
>>>>             @Override
>>>>             public long extractTimestamp(Event element, long
>>>> previousElementTimestamp) {
>>>>                 return element.tmst;
>>>>             }
>>>>
>>>>             @Override
>>>>             public Watermark getCurrentWatermark() {
>>>>                 waterMarkTmst = System.currentTimeMillis() - 5000L;
>>>>                 System.out.println(String.format("Watermark at %s",
>>>> new Date(waterMarkTmst)));
>>>>                 return new Watermark(waterMarkTmst);
>>>>             }
>>>>         }).keyBy("key");
>>>>
>>>> withTimestampsAndWatermarks.getExecutionConfig().setAutoWate
>>>> rmarkInterval(1000L);
>>>>
>>>> // Apply pattern filtering on stream.
>>>> PatternStream<Event> patternStream = CEP.pattern(withTimestampsAndWatermarks,
>>>> pattern);
>>>>
>>>> Any idea what's wrong?
>>>>
>>>> David
>>>>
>>>>
>>>> On Tue, Oct 11, 2016 at 10:20 PM, Sameer W <sa...@axiomine.com> wrote:
>>>>
>>>>> Assuming an element with timestamp which is later than the last
>>>>> emitted watermark arrives, would it just be dropped because the
>>>>> PatternStream does not have a max allowed lateness method? In that case it
>>>>> appears that CEP cannot handle late events yet out of the box.
>>>>>
>>>>> If we do want to support late events can we chain a
>>>>> keyBy().timeWindow().allowedLateness(x).map().assignTimestampsAndWatermarks().keyBy()
>>>>> again before handing it to the CEP operator. This way we may have the
>>>>> patterns fired multiple times but it allows an event to be late and out of
>>>>> order. It looks like it will work but is there a less convoluted way.
>>>>>
>>>>> Thanks,
>>>>> Sameer
>>>>>
>>>>> On Tue, Oct 11, 2016 at 12:17 PM, Till Rohrmann <
>>>>> till.rohrmann@gmail.com> wrote:
>>>>>
>>>>>> But then no element later than the last emitted watermark must be
>>>>>> issued by the sources. If that is the case, then this solution should work.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Tue, Oct 11, 2016 at 4:50 PM, Sameer W <sa...@axiomine.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> If you know that the events are arriving in order and a consistent
>>>>>>> lag, why not just increment the watermark time every time the
>>>>>>> getCurrentWatermark() method is invoked based on the autoWatermarkInterval
>>>>>>> (or less to be conservative).
>>>>>>>
>>>>>>> You can check if the watermark has changed since the arrival of the
>>>>>>> last event and if not increment it in the getCurrentWatermark() method.
>>>>>>> Otherwise the watermark will never increase until an element arrive and if
>>>>>>> the stream partition stalls for some reason the whole pipeline freezes.
>>>>>>>
>>>>>>> Sameer
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Oct 11, 2016 at 6:04 AM, Till Rohrmann <
>>>>>>> till.rohrmann@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi David,
>>>>>>>>
>>>>>>>> the problem is still that there is no corresponding watermark
>>>>>>>> saying that 4 seconds have now passed. With your code, watermarks will be
>>>>>>>> periodically emitted but the same watermark will be emitted until a new
>>>>>>>> element arrives which will reset the watermark. Thus, the system can never
>>>>>>>> know until this watermark is seen whether there will be an earlier event or
>>>>>>>> not. I fear that this is a fundamental problem with stream processing.
>>>>>>>>
>>>>>>>> You're right that the negation operator won't solve the problem. It
>>>>>>>> will indeed suffer from the same problem.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Till
>>>>>>>>
>>>>>>>> On Sun, Oct 9, 2016 at 7:37 PM, <lg...@yahoo.com> wrote:
>>>>>>>>
>>>>>>>>> >>FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP
>>>>>>>>> "not" operator) does not address this because again, how would the "not
>>>>>>>>> match" be triggered if no event at all occurs?
>>>>>>>>>
>>>>>>>>> Good question.
>>>>>>>>>
>>>>>>>>> I'm not sure whether the following will work:
>>>>>>>>>
>>>>>>>>> This could be done by creating a CEP matching pattern that uses
>>>>>>>>> both of "notNext" (or "notFollowedBy") and "within" constructs. Something
>>>>>>>>> like this:
>>>>>>>>>
>>>>>>>>> Pattern<Event, ?> pattern = Pattern.<Event>begin("first")
>>>>>>>>>     .notNext("second")
>>>>>>>>>     .within(Time.seconds(3));
>>>>>>>>>
>>>>>>>>> I'm hoping Flink CEP experts (Till?) will comment on this.
>>>>>>>>>
>>>>>>>>> Note: I have requested these negation patterns to be implemented
>>>>>>>>> in Flink CEP, but notNext/notFollowedBy are not yet implemented in Flink..
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> - LF
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ------------------------------
>>>>>>>>> *From:* David Koch <og...@googlemail.com>
>>>>>>>>> *To:* user@flink.apache.org; lgfmt@yahoo.com
>>>>>>>>> *Sent:* Sunday, October 9, 2016 5:51 AM
>>>>>>>>>
>>>>>>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>>>>>>>
>>>>>>>>> Hello,
>>>>>>>>>
>>>>>>>>> Thank you for the explanation as well as the link to the other
>>>>>>>>> post. Interesting to learn about some of the open JIRAs.
>>>>>>>>>
>>>>>>>>> Indeed, I was not using event time, but processing time. However,
>>>>>>>>> even when using event time I only get notified of timeouts upon subsequent
>>>>>>>>> events.
>>>>>>>>>
>>>>>>>>> The link <http://pastebin.com/x4m3RHQz> contains an example where
>>>>>>>>> I read <key> <value> from a socket, wrap this in a custom "event" with
>>>>>>>>> timestamp, key the resultant stream by <key> and attempt to detect <key>
>>>>>>>>> instances no further than 3 seconds apart using CEP.
>>>>>>>>>
>>>>>>>>> Apart from the fact that results are only printed when I close the
>>>>>>>>> socket (normal?) I don't observe any change in behaviour
>>>>>>>>>
>>>>>>>>> So event-time/watermarks or not: SOME event has to occur for the
>>>>>>>>> timeout to be triggered.
>>>>>>>>>
>>>>>>>>> FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP
>>>>>>>>> "not" operator) does not address this because again, how would the "not
>>>>>>>>> match" be triggered if no event at all occurs?
>>>>>>>>>
>>>>>>>>> On Sat, Oct 8, 2016 at 12:50 AM, <lg...@yahoo.com> wrote:
>>>>>>>>>
>>>>>>>>> The following is a better link:
>>>>>>>>>
>>>>>>>>> http://mail-archives.apache. org/mod_mbox/flink-user/
>>>>>>>>> 201609.mbox/%3CCAC27z% 3DOTtv7USYUm82bE43- DkoGfVC4UAWD6uQwwRgTsE5be8g%
>>>>>>>>> 40mail.gmail.com%3E
>>>>>>>>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOTtv7USYUm82bE43-DkoGfVC4UAWD6uQwwRgTsE5be8g%40mail.gmail.com%3E>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> - LF
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ------------------------------
>>>>>>>>> *From:* "lgfmt@yahoo.com" <lg...@yahoo.com>
>>>>>>>>> *To:* "user@flink.apache.org" <us...@flink.apache.org>
>>>>>>>>> *Sent:* Friday, October 7, 2016 3:36 PM
>>>>>>>>>
>>>>>>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>>>>>>>
>>>>>>>>> Isn't the upcoming CEP negation (absence of an event) feature
>>>>>>>>> solve this issue?
>>>>>>>>>
>>>>>>>>> See this discussion thread:
>>>>>>>>> http://mail-archives.apache. org/mod_mbox/flink-user/
>>>>>>>>> 201609.mbox/%3CCAC27z%3DOD% 2BTq8twBw_ 1YKni5sWAU3g1S9WDpJw0DUwgiG9YX
>>>>>>>>> 9Fg%40mail.gmail.com%3E
>>>>>>>>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOD%2BTq8twBw_1YKni5sWAU3g1S9WDpJw0DUwgiG9YX9Fg%40mail.gmail.com%3E>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> //  Atul
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ------------------------------
>>>>>>>>> *From:* Till Rohrmann <tr...@apache.org>
>>>>>>>>> *To:* user@flink.apache.org
>>>>>>>>> *Sent:* Friday, October 7, 2016 12:58 AM
>>>>>>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>>>>>>>
>>>>>>>>> Hi David,
>>>>>>>>>
>>>>>>>>> in case of event time, the timeout will be detected when the first
>>>>>>>>> watermark exceeding the timeout value is received. Thus, it depends a
>>>>>>>>> little bit how you generate watermarks (e.g. periodically, watermark per
>>>>>>>>> event).
>>>>>>>>>
>>>>>>>>> In case of processing time, the time is only updated whenever a
>>>>>>>>> new element arrives. Thus, if you have an element arriving 4 seconds after
>>>>>>>>> Event A, it should detect the timeout. If the next event arrives 20 seconds
>>>>>>>>> later, than you won't see the timeout until then.
>>>>>>>>>
>>>>>>>>> In the case of processing time, we could think about registering
>>>>>>>>> timeout timers for processing time. However, I would highly recommend you
>>>>>>>>> to use event time, because with processing time, Flink cannot guarantee
>>>>>>>>> meaningful computations, because the events might arrive out of order.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Till
>>>>>>>>>
>>>>>>>>> On Thu, Oct 6, 2016 at 3:08 PM, David Koch <og...@googlemail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Hello,
>>>>>>>>>
>>>>>>>>> With Flink CEP, is there a way to actively listen to pattern
>>>>>>>>> matches that time out? I am under the impression that this is not possible.
>>>>>>>>>
>>>>>>>>> In my case I partition a stream containing user web navigation by
>>>>>>>>> "userId" to look for sequences of Event A, followed by B within 4 seconds
>>>>>>>>> for each user.
>>>>>>>>>
>>>>>>>>> I registered a PatternTimeoutFunction which assuming a non-match
>>>>>>>>> only fires upon the first event after the specified timeout. For example,
>>>>>>>>> given user X: Event A, 20 seconds later Event B (or any other type of
>>>>>>>>> event).
>>>>>>>>>
>>>>>>>>> I'd rather have a notification fire directly upon the 4 second
>>>>>>>>> interval expiring since passive invalidation is not really applicable in my
>>>>>>>>> case.
>>>>>>>>>
>>>>>>>>> How, if at all can this be achieved with Flink CEP?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>>
>>>>>>>>> David
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Listening to timed-out patterns in Flink CEP

Posted by Till Rohrmann <tr...@apache.org>.
Hi guys,

I'll try to come up with an example illustrating the behaviour over the
weekend.

Cheers,
Till

On Fri, Oct 14, 2016 at 11:16 AM, David Koch <og...@googlemail.com> wrote:

> Hello,
>
> Thanks for the code Sameer. Unfortunately, it didn't solve the issue.
> Compared to what I did the principle is the same - make sure that the
> watermark advances even without events present to trigger timeouts in CEP
> patterns.
>
> If Till or anyone else could provide a minimal example illustrating the
> supposed behaviour of:
>
> [CEP] timeout will be detected when the first watermark exceeding the
>> timeout value is received
>
>
> I'd very much appreciate it.
>
> Regards,
>
> David
>
>
> On Wed, Oct 12, 2016 at 1:54 AM, Sameer W <sa...@axiomine.com> wrote:
>
>> Try this. Your WM's need to move forward. Also don't use System
>> Timestamp. Use the timestamp of the element seen as the reference as the
>> elements are most likely lagging the system timestamp.
>>
>> DataStream<Event> withTimestampsAndWatermarks = tuples
>>         .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Event>()
>> {
>>
>>             long waterMarkTmst;
>>             long lastEmittedWM=0;
>>             @Override
>>             public long extractTimestamp(Event element, long
>> previousElementTimestamp) {
>>                 if(element.tmst>lastEmittedWM){
>>                    waterMarkTmst = element.tmst-1; //Assumes increasing
>> timestamps. Need to subtract 1 as more elements with same TS might arrive
>>                 }
>>                 return element.tmst;
>>             }
>>
>>             @Override
>>             public Watermark getCurrentWatermark() {
>>                 if(lastEmittedWM==waterMarkTmst){ //No new event seen,
>> move the WM forward by auto watermark interval
>>                     waterMarkTmst = waterMarkTmst + 1000l//Increase by
>> auto watermark interval (Watermarks only move forward in time)
>>                 }
>>                 lastEmittedWM = waterMarkTmst
>>
>>                 System.out.println(String.format("Watermark at %s", new
>> Date(waterMarkTmst)));
>>                 return new Watermark(waterMarkTmst);//Until an event is
>> seem WM==0 starts advancing by 1000ms until an event is seen
>>             }
>>         }).keyBy("key");
>>
>> On Tue, Oct 11, 2016 at 7:29 PM, David Koch <og...@googlemail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> I tried setting the watermark to System.currentTimeMillis() - 5000L,
>>> event timestamps are System.currentTimeMillis(). I do not observe the
>>> expected behaviour of the PatternTimeoutFunction firing once the watermark
>>> moves past the timeout "anchored" by a pattern match.
>>>
>>> Here is the complete test class source <http://pastebin.com/9WxGq2wv>,
>>> in case someone is interested. The timestamp/watermark assigner looks like
>>> this:
>>>
>>> DataStream<Event> withTimestampsAndWatermarks = tuples
>>>         .assignTimestampsAndWatermarks(new
>>> AssignerWithPeriodicWatermarks<Event>() {
>>>
>>>             long waterMarkTmst;
>>>
>>>             @Override
>>>             public long extractTimestamp(Event element, long
>>> previousElementTimestamp) {
>>>                 return element.tmst;
>>>             }
>>>
>>>             @Override
>>>             public Watermark getCurrentWatermark() {
>>>                 waterMarkTmst = System.currentTimeMillis() - 5000L;
>>>                 System.out.println(String.format("Watermark at %s", new
>>> Date(waterMarkTmst)));
>>>                 return new Watermark(waterMarkTmst);
>>>             }
>>>         }).keyBy("key");
>>>
>>> withTimestampsAndWatermarks.getExecutionConfig().setAutoWate
>>> rmarkInterval(1000L);
>>>
>>> // Apply pattern filtering on stream.
>>> PatternStream<Event> patternStream = CEP.pattern(withTimestampsAndWatermarks,
>>> pattern);
>>>
>>> Any idea what's wrong?
>>>
>>> David
>>>
>>>
>>> On Tue, Oct 11, 2016 at 10:20 PM, Sameer W <sa...@axiomine.com> wrote:
>>>
>>>> Assuming an element with timestamp which is later than the last emitted
>>>> watermark arrives, would it just be dropped because the PatternStream does
>>>> not have a max allowed lateness method? In that case it appears that CEP
>>>> cannot handle late events yet out of the box.
>>>>
>>>> If we do want to support late events can we chain a
>>>> keyBy().timeWindow().allowedLateness(x).map().assignTimestampsAndWatermarks().keyBy()
>>>> again before handing it to the CEP operator. This way we may have the
>>>> patterns fired multiple times but it allows an event to be late and out of
>>>> order. It looks like it will work but is there a less convoluted way.
>>>>
>>>> Thanks,
>>>> Sameer
>>>>
>>>> On Tue, Oct 11, 2016 at 12:17 PM, Till Rohrmann <
>>>> till.rohrmann@gmail.com> wrote:
>>>>
>>>>> But then no element later than the last emitted watermark must be
>>>>> issued by the sources. If that is the case, then this solution should work.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Tue, Oct 11, 2016 at 4:50 PM, Sameer W <sa...@axiomine.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> If you know that the events are arriving in order and a consistent
>>>>>> lag, why not just increment the watermark time every time the
>>>>>> getCurrentWatermark() method is invoked based on the autoWatermarkInterval
>>>>>> (or less to be conservative).
>>>>>>
>>>>>> You can check if the watermark has changed since the arrival of the
>>>>>> last event and if not increment it in the getCurrentWatermark() method.
>>>>>> Otherwise the watermark will never increase until an element arrive and if
>>>>>> the stream partition stalls for some reason the whole pipeline freezes.
>>>>>>
>>>>>> Sameer
>>>>>>
>>>>>>
>>>>>> On Tue, Oct 11, 2016 at 6:04 AM, Till Rohrmann <
>>>>>> till.rohrmann@gmail.com> wrote:
>>>>>>
>>>>>>> Hi David,
>>>>>>>
>>>>>>> the problem is still that there is no corresponding watermark saying
>>>>>>> that 4 seconds have now passed. With your code, watermarks will be
>>>>>>> periodically emitted but the same watermark will be emitted until a new
>>>>>>> element arrives which will reset the watermark. Thus, the system can never
>>>>>>> know until this watermark is seen whether there will be an earlier event or
>>>>>>> not. I fear that this is a fundamental problem with stream processing.
>>>>>>>
>>>>>>> You're right that the negation operator won't solve the problem. It
>>>>>>> will indeed suffer from the same problem.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>>
>>>>>>> On Sun, Oct 9, 2016 at 7:37 PM, <lg...@yahoo.com> wrote:
>>>>>>>
>>>>>>>> >>FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP
>>>>>>>> "not" operator) does not address this because again, how would the "not
>>>>>>>> match" be triggered if no event at all occurs?
>>>>>>>>
>>>>>>>> Good question.
>>>>>>>>
>>>>>>>> I'm not sure whether the following will work:
>>>>>>>>
>>>>>>>> This could be done by creating a CEP matching pattern that uses
>>>>>>>> both of "notNext" (or "notFollowedBy") and "within" constructs. Something
>>>>>>>> like this:
>>>>>>>>
>>>>>>>> Pattern<Event, ?> pattern = Pattern.<Event>begin("first")
>>>>>>>>     .notNext("second")
>>>>>>>>     .within(Time.seconds(3));
>>>>>>>>
>>>>>>>> I'm hoping Flink CEP experts (Till?) will comment on this.
>>>>>>>>
>>>>>>>> Note: I have requested these negation patterns to be implemented in
>>>>>>>> Flink CEP, but notNext/notFollowedBy are not yet implemented in Flink..
>>>>>>>>
>>>>>>>>
>>>>>>>> - LF
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> ------------------------------
>>>>>>>> *From:* David Koch <og...@googlemail.com>
>>>>>>>> *To:* user@flink.apache.org; lgfmt@yahoo.com
>>>>>>>> *Sent:* Sunday, October 9, 2016 5:51 AM
>>>>>>>>
>>>>>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>>>>>>
>>>>>>>> Hello,
>>>>>>>>
>>>>>>>> Thank you for the explanation as well as the link to the other
>>>>>>>> post. Interesting to learn about some of the open JIRAs.
>>>>>>>>
>>>>>>>> Indeed, I was not using event time, but processing time. However,
>>>>>>>> even when using event time I only get notified of timeouts upon subsequent
>>>>>>>> events.
>>>>>>>>
>>>>>>>> The link <http://pastebin.com/x4m3RHQz> contains an example where
>>>>>>>> I read <key> <value> from a socket, wrap this in a custom "event" with
>>>>>>>> timestamp, key the resultant stream by <key> and attempt to detect <key>
>>>>>>>> instances no further than 3 seconds apart using CEP.
>>>>>>>>
>>>>>>>> Apart from the fact that results are only printed when I close the
>>>>>>>> socket (normal?) I don't observe any change in behaviour
>>>>>>>>
>>>>>>>> So event-time/watermarks or not: SOME event has to occur for the
>>>>>>>> timeout to be triggered.
>>>>>>>>
>>>>>>>> FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP
>>>>>>>> "not" operator) does not address this because again, how would the "not
>>>>>>>> match" be triggered if no event at all occurs?
>>>>>>>>
>>>>>>>> On Sat, Oct 8, 2016 at 12:50 AM, <lg...@yahoo.com> wrote:
>>>>>>>>
>>>>>>>> The following is a better link:
>>>>>>>>
>>>>>>>> http://mail-archives.apache. org/mod_mbox/flink-user/
>>>>>>>> 201609.mbox/%3CCAC27z% 3DOTtv7USYUm82bE43- DkoGfVC4UAWD6uQwwRgTsE5be8g%
>>>>>>>> 40mail.gmail.com%3E
>>>>>>>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOTtv7USYUm82bE43-DkoGfVC4UAWD6uQwwRgTsE5be8g%40mail.gmail.com%3E>
>>>>>>>>
>>>>>>>>
>>>>>>>> - LF
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> ------------------------------
>>>>>>>> *From:* "lgfmt@yahoo.com" <lg...@yahoo.com>
>>>>>>>> *To:* "user@flink.apache.org" <us...@flink.apache.org>
>>>>>>>> *Sent:* Friday, October 7, 2016 3:36 PM
>>>>>>>>
>>>>>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>>>>>>
>>>>>>>> Isn't the upcoming CEP negation (absence of an event) feature
>>>>>>>> solve this issue?
>>>>>>>>
>>>>>>>> See this discussion thread:
>>>>>>>> http://mail-archives.apache. org/mod_mbox/flink-user/
>>>>>>>> 201609.mbox/%3CCAC27z%3DOD% 2BTq8twBw_ 1YKni5sWAU3g1S9WDpJw0DUwgiG9YX
>>>>>>>> 9Fg%40mail.gmail.com%3E
>>>>>>>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOD%2BTq8twBw_1YKni5sWAU3g1S9WDpJw0DUwgiG9YX9Fg%40mail.gmail.com%3E>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> //  Atul
>>>>>>>>
>>>>>>>>
>>>>>>>> ------------------------------
>>>>>>>> *From:* Till Rohrmann <tr...@apache.org>
>>>>>>>> *To:* user@flink.apache.org
>>>>>>>> *Sent:* Friday, October 7, 2016 12:58 AM
>>>>>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>>>>>>
>>>>>>>> Hi David,
>>>>>>>>
>>>>>>>> in case of event time, the timeout will be detected when the first
>>>>>>>> watermark exceeding the timeout value is received. Thus, it depends a
>>>>>>>> little bit how you generate watermarks (e.g. periodically, watermark per
>>>>>>>> event).
>>>>>>>>
>>>>>>>> In case of processing time, the time is only updated whenever a new
>>>>>>>> element arrives. Thus, if you have an element arriving 4 seconds after
>>>>>>>> Event A, it should detect the timeout. If the next event arrives 20 seconds
>>>>>>>> later, than you won't see the timeout until then.
>>>>>>>>
>>>>>>>> In the case of processing time, we could think about registering
>>>>>>>> timeout timers for processing time. However, I would highly recommend you
>>>>>>>> to use event time, because with processing time, Flink cannot guarantee
>>>>>>>> meaningful computations, because the events might arrive out of order.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Till
>>>>>>>>
>>>>>>>> On Thu, Oct 6, 2016 at 3:08 PM, David Koch <og...@googlemail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Hello,
>>>>>>>>
>>>>>>>> With Flink CEP, is there a way to actively listen to pattern
>>>>>>>> matches that time out? I am under the impression that this is not possible.
>>>>>>>>
>>>>>>>> In my case I partition a stream containing user web navigation by
>>>>>>>> "userId" to look for sequences of Event A, followed by B within 4 seconds
>>>>>>>> for each user.
>>>>>>>>
>>>>>>>> I registered a PatternTimeoutFunction which assuming a non-match
>>>>>>>> only fires upon the first event after the specified timeout. For example,
>>>>>>>> given user X: Event A, 20 seconds later Event B (or any other type of
>>>>>>>> event).
>>>>>>>>
>>>>>>>> I'd rather have a notification fire directly upon the 4 second
>>>>>>>> interval expiring since passive invalidation is not really applicable in my
>>>>>>>> case.
>>>>>>>>
>>>>>>>> How, if at all can this be achieved with Flink CEP?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>
>>>>>>>> David
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Listening to timed-out patterns in Flink CEP

Posted by David Koch <og...@googlemail.com>.
Hello,

Thanks for the code Sameer. Unfortunately, it didn't solve the issue.
Compared to what I did the principle is the same - make sure that the
watermark advances even without events present to trigger timeouts in CEP
patterns.

If Till or anyone else could provide a minimal example illustrating the
supposed behaviour of:

[CEP] timeout will be detected when the first watermark exceeding the
> timeout value is received


I'd very much appreciate it.

Regards,

David


On Wed, Oct 12, 2016 at 1:54 AM, Sameer W <sa...@axiomine.com> wrote:

> Try this. Your WM's need to move forward. Also don't use System Timestamp.
> Use the timestamp of the element seen as the reference as the elements are
> most likely lagging the system timestamp.
>
> DataStream<Event> withTimestampsAndWatermarks = tuples
>         .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Event>()
> {
>
>             long waterMarkTmst;
>             long lastEmittedWM=0;
>             @Override
>             public long extractTimestamp(Event element, long
> previousElementTimestamp) {
>                 if(element.tmst>lastEmittedWM){
>                    waterMarkTmst = element.tmst-1; //Assumes increasing
> timestamps. Need to subtract 1 as more elements with same TS might arrive
>                 }
>                 return element.tmst;
>             }
>
>             @Override
>             public Watermark getCurrentWatermark() {
>                 if(lastEmittedWM==waterMarkTmst){ //No new event seen,
> move the WM forward by auto watermark interval
>                     waterMarkTmst = waterMarkTmst + 1000l//Increase by
> auto watermark interval (Watermarks only move forward in time)
>                 }
>                 lastEmittedWM = waterMarkTmst
>
>                 System.out.println(String.format("Watermark at %s", new
> Date(waterMarkTmst)));
>                 return new Watermark(waterMarkTmst);//Until an event is
> seem WM==0 starts advancing by 1000ms until an event is seen
>             }
>         }).keyBy("key");
>
> On Tue, Oct 11, 2016 at 7:29 PM, David Koch <og...@googlemail.com> wrote:
>
>> Hello,
>>
>> I tried setting the watermark to System.currentTimeMillis() - 5000L,
>> event timestamps are System.currentTimeMillis(). I do not observe the
>> expected behaviour of the PatternTimeoutFunction firing once the watermark
>> moves past the timeout "anchored" by a pattern match.
>>
>> Here is the complete test class source <http://pastebin.com/9WxGq2wv>,
>> in case someone is interested. The timestamp/watermark assigner looks like
>> this:
>>
>> DataStream<Event> withTimestampsAndWatermarks = tuples
>>         .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Event>()
>> {
>>
>>             long waterMarkTmst;
>>
>>             @Override
>>             public long extractTimestamp(Event element, long
>> previousElementTimestamp) {
>>                 return element.tmst;
>>             }
>>
>>             @Override
>>             public Watermark getCurrentWatermark() {
>>                 waterMarkTmst = System.currentTimeMillis() - 5000L;
>>                 System.out.println(String.format("Watermark at %s", new
>> Date(waterMarkTmst)));
>>                 return new Watermark(waterMarkTmst);
>>             }
>>         }).keyBy("key");
>>
>> withTimestampsAndWatermarks.getExecutionConfig().setAutoWate
>> rmarkInterval(1000L);
>>
>> // Apply pattern filtering on stream.
>> PatternStream<Event> patternStream = CEP.pattern(withTimestampsAndWatermarks,
>> pattern);
>>
>> Any idea what's wrong?
>>
>> David
>>
>>
>> On Tue, Oct 11, 2016 at 10:20 PM, Sameer W <sa...@axiomine.com> wrote:
>>
>>> Assuming an element with timestamp which is later than the last emitted
>>> watermark arrives, would it just be dropped because the PatternStream does
>>> not have a max allowed lateness method? In that case it appears that CEP
>>> cannot handle late events yet out of the box.
>>>
>>> If we do want to support late events can we chain a
>>> keyBy().timeWindow().allowedLateness(x).map().assignTimestampsAndWatermarks().keyBy()
>>> again before handing it to the CEP operator. This way we may have the
>>> patterns fired multiple times but it allows an event to be late and out of
>>> order. It looks like it will work but is there a less convoluted way.
>>>
>>> Thanks,
>>> Sameer
>>>
>>> On Tue, Oct 11, 2016 at 12:17 PM, Till Rohrmann <till.rohrmann@gmail.com
>>> > wrote:
>>>
>>>> But then no element later than the last emitted watermark must be
>>>> issued by the sources. If that is the case, then this solution should work.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Tue, Oct 11, 2016 at 4:50 PM, Sameer W <sa...@axiomine.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> If you know that the events are arriving in order and a consistent
>>>>> lag, why not just increment the watermark time every time the
>>>>> getCurrentWatermark() method is invoked based on the autoWatermarkInterval
>>>>> (or less to be conservative).
>>>>>
>>>>> You can check if the watermark has changed since the arrival of the
>>>>> last event and if not increment it in the getCurrentWatermark() method.
>>>>> Otherwise the watermark will never increase until an element arrive and if
>>>>> the stream partition stalls for some reason the whole pipeline freezes.
>>>>>
>>>>> Sameer
>>>>>
>>>>>
>>>>> On Tue, Oct 11, 2016 at 6:04 AM, Till Rohrmann <
>>>>> till.rohrmann@gmail.com> wrote:
>>>>>
>>>>>> Hi David,
>>>>>>
>>>>>> the problem is still that there is no corresponding watermark saying
>>>>>> that 4 seconds have now passed. With your code, watermarks will be
>>>>>> periodically emitted but the same watermark will be emitted until a new
>>>>>> element arrives which will reset the watermark. Thus, the system can never
>>>>>> know until this watermark is seen whether there will be an earlier event or
>>>>>> not. I fear that this is a fundamental problem with stream processing.
>>>>>>
>>>>>> You're right that the negation operator won't solve the problem. It
>>>>>> will indeed suffer from the same problem.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Sun, Oct 9, 2016 at 7:37 PM, <lg...@yahoo.com> wrote:
>>>>>>
>>>>>>> >>FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP
>>>>>>> "not" operator) does not address this because again, how would the "not
>>>>>>> match" be triggered if no event at all occurs?
>>>>>>>
>>>>>>> Good question.
>>>>>>>
>>>>>>> I'm not sure whether the following will work:
>>>>>>>
>>>>>>> This could be done by creating a CEP matching pattern that uses both
>>>>>>> of "notNext" (or "notFollowedBy") and "within" constructs. Something like
>>>>>>> this:
>>>>>>>
>>>>>>> Pattern<Event, ?> pattern = Pattern.<Event>begin("first")
>>>>>>>     .notNext("second")
>>>>>>>     .within(Time.seconds(3));
>>>>>>>
>>>>>>> I'm hoping Flink CEP experts (Till?) will comment on this.
>>>>>>>
>>>>>>> Note: I have requested these negation patterns to be implemented in
>>>>>>> Flink CEP, but notNext/notFollowedBy are not yet implemented in Flink..
>>>>>>>
>>>>>>>
>>>>>>> - LF
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> ------------------------------
>>>>>>> *From:* David Koch <og...@googlemail.com>
>>>>>>> *To:* user@flink.apache.org; lgfmt@yahoo.com
>>>>>>> *Sent:* Sunday, October 9, 2016 5:51 AM
>>>>>>>
>>>>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> Thank you for the explanation as well as the link to the other post.
>>>>>>> Interesting to learn about some of the open JIRAs.
>>>>>>>
>>>>>>> Indeed, I was not using event time, but processing time. However,
>>>>>>> even when using event time I only get notified of timeouts upon subsequent
>>>>>>> events.
>>>>>>>
>>>>>>> The link <http://pastebin.com/x4m3RHQz> contains an example where I
>>>>>>> read <key> <value> from a socket, wrap this in a custom "event" with
>>>>>>> timestamp, key the resultant stream by <key> and attempt to detect <key>
>>>>>>> instances no further than 3 seconds apart using CEP.
>>>>>>>
>>>>>>> Apart from the fact that results are only printed when I close the
>>>>>>> socket (normal?) I don't observe any change in behaviour
>>>>>>>
>>>>>>> So event-time/watermarks or not: SOME event has to occur for the
>>>>>>> timeout to be triggered.
>>>>>>>
>>>>>>> FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP
>>>>>>> "not" operator) does not address this because again, how would the "not
>>>>>>> match" be triggered if no event at all occurs?
>>>>>>>
>>>>>>> On Sat, Oct 8, 2016 at 12:50 AM, <lg...@yahoo.com> wrote:
>>>>>>>
>>>>>>> The following is a better link:
>>>>>>>
>>>>>>> http://mail-archives.apache. org/mod_mbox/flink-user/
>>>>>>> 201609.mbox/%3CCAC27z% 3DOTtv7USYUm82bE43- DkoGfVC4UAWD6uQwwRgTsE5be8g%
>>>>>>> 40mail.gmail.com%3E
>>>>>>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOTtv7USYUm82bE43-DkoGfVC4UAWD6uQwwRgTsE5be8g%40mail.gmail.com%3E>
>>>>>>>
>>>>>>>
>>>>>>> - LF
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> ------------------------------
>>>>>>> *From:* "lgfmt@yahoo.com" <lg...@yahoo.com>
>>>>>>> *To:* "user@flink.apache.org" <us...@flink.apache.org>
>>>>>>> *Sent:* Friday, October 7, 2016 3:36 PM
>>>>>>>
>>>>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>>>>>
>>>>>>> Isn't the upcoming CEP negation (absence of an event) feature solve
>>>>>>> this issue?
>>>>>>>
>>>>>>> See this discussion thread:
>>>>>>> http://mail-archives.apache. org/mod_mbox/flink-user/
>>>>>>> 201609.mbox/%3CCAC27z%3DOD% 2BTq8twBw_ 1YKni5sWAU3g1S9WDpJw0DUwgiG9YX
>>>>>>> 9Fg%40mail.gmail.com%3E
>>>>>>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOD%2BTq8twBw_1YKni5sWAU3g1S9WDpJw0DUwgiG9YX9Fg%40mail.gmail.com%3E>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> //  Atul
>>>>>>>
>>>>>>>
>>>>>>> ------------------------------
>>>>>>> *From:* Till Rohrmann <tr...@apache.org>
>>>>>>> *To:* user@flink.apache.org
>>>>>>> *Sent:* Friday, October 7, 2016 12:58 AM
>>>>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>>>>>
>>>>>>> Hi David,
>>>>>>>
>>>>>>> in case of event time, the timeout will be detected when the first
>>>>>>> watermark exceeding the timeout value is received. Thus, it depends a
>>>>>>> little bit how you generate watermarks (e.g. periodically, watermark per
>>>>>>> event).
>>>>>>>
>>>>>>> In case of processing time, the time is only updated whenever a new
>>>>>>> element arrives. Thus, if you have an element arriving 4 seconds after
>>>>>>> Event A, it should detect the timeout. If the next event arrives 20 seconds
>>>>>>> later, than you won't see the timeout until then.
>>>>>>>
>>>>>>> In the case of processing time, we could think about registering
>>>>>>> timeout timers for processing time. However, I would highly recommend you
>>>>>>> to use event time, because with processing time, Flink cannot guarantee
>>>>>>> meaningful computations, because the events might arrive out of order.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>>
>>>>>>> On Thu, Oct 6, 2016 at 3:08 PM, David Koch <og...@googlemail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> With Flink CEP, is there a way to actively listen to pattern matches
>>>>>>> that time out? I am under the impression that this is not possible.
>>>>>>>
>>>>>>> In my case I partition a stream containing user web navigation by
>>>>>>> "userId" to look for sequences of Event A, followed by B within 4 seconds
>>>>>>> for each user.
>>>>>>>
>>>>>>> I registered a PatternTimeoutFunction which assuming a non-match
>>>>>>> only fires upon the first event after the specified timeout. For example,
>>>>>>> given user X: Event A, 20 seconds later Event B (or any other type of
>>>>>>> event).
>>>>>>>
>>>>>>> I'd rather have a notification fire directly upon the 4 second
>>>>>>> interval expiring since passive invalidation is not really applicable in my
>>>>>>> case.
>>>>>>>
>>>>>>> How, if at all can this be achieved with Flink CEP?
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> David
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Listening to timed-out patterns in Flink CEP

Posted by Sameer W <sa...@axiomine.com>.
Try this. Your WM's need to move forward. Also don't use System Timestamp.
Use the timestamp of the element seen as the reference as the elements are
most likely lagging the system timestamp.

DataStream<Event> withTimestampsAndWatermarks = tuples
        .assignTimestampsAndWatermarks(new
AssignerWithPeriodicWatermarks<Event>()
{

            long waterMarkTmst;
            long lastEmittedWM=0;
            @Override
            public long extractTimestamp(Event element, long
previousElementTimestamp) {
                if(element.tmst>lastEmittedWM){
                   waterMarkTmst = element.tmst-1; //Assumes increasing
timestamps. Need to subtract 1 as more elements with same TS might arrive
                }
                return element.tmst;
            }

            @Override
            public Watermark getCurrentWatermark() {
                if(lastEmittedWM==waterMarkTmst){ //No new event seen, move
the WM forward by auto watermark interval
                    waterMarkTmst = waterMarkTmst + 1000l//Increase by auto
watermark interval (Watermarks only move forward in time)
                }
                lastEmittedWM = waterMarkTmst

                System.out.println(String.format("Watermark at %s", new
Date(waterMarkTmst)));
                return new Watermark(waterMarkTmst);//Until an event is
seem WM==0 starts advancing by 1000ms until an event is seen
            }
        }).keyBy("key");

On Tue, Oct 11, 2016 at 7:29 PM, David Koch <og...@googlemail.com> wrote:

> Hello,
>
> I tried setting the watermark to System.currentTimeMillis() - 5000L, event
> timestamps are System.currentTimeMillis(). I do not observe the expected
> behaviour of the PatternTimeoutFunction firing once the watermark moves
> past the timeout "anchored" by a pattern match.
>
> Here is the complete test class source <http://pastebin.com/9WxGq2wv>, in
> case someone is interested. The timestamp/watermark assigner looks like
> this:
>
> DataStream<Event> withTimestampsAndWatermarks = tuples
>         .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Event>()
> {
>
>             long waterMarkTmst;
>
>             @Override
>             public long extractTimestamp(Event element, long
> previousElementTimestamp) {
>                 return element.tmst;
>             }
>
>             @Override
>             public Watermark getCurrentWatermark() {
>                 waterMarkTmst = System.currentTimeMillis() - 5000L;
>                 System.out.println(String.format("Watermark at %s", new
> Date(waterMarkTmst)));
>                 return new Watermark(waterMarkTmst);
>             }
>         }).keyBy("key");
>
> withTimestampsAndWatermarks.getExecutionConfig().setAutoWatermarkInterval(
> 1000L);
>
> // Apply pattern filtering on stream.
> PatternStream<Event> patternStream = CEP.pattern(withTimestampsAndWatermarks,
> pattern);
>
> Any idea what's wrong?
>
> David
>
>
> On Tue, Oct 11, 2016 at 10:20 PM, Sameer W <sa...@axiomine.com> wrote:
>
>> Assuming an element with timestamp which is later than the last emitted
>> watermark arrives, would it just be dropped because the PatternStream does
>> not have a max allowed lateness method? In that case it appears that CEP
>> cannot handle late events yet out of the box.
>>
>> If we do want to support late events can we chain a
>> keyBy().timeWindow().allowedLateness(x).map().assignTimestampsAndWatermarks().keyBy()
>> again before handing it to the CEP operator. This way we may have the
>> patterns fired multiple times but it allows an event to be late and out of
>> order. It looks like it will work but is there a less convoluted way.
>>
>> Thanks,
>> Sameer
>>
>> On Tue, Oct 11, 2016 at 12:17 PM, Till Rohrmann <ti...@gmail.com>
>> wrote:
>>
>>> But then no element later than the last emitted watermark must be issued
>>> by the sources. If that is the case, then this solution should work.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Oct 11, 2016 at 4:50 PM, Sameer W <sa...@axiomine.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> If you know that the events are arriving in order and a consistent lag,
>>>> why not just increment the watermark time every time the
>>>> getCurrentWatermark() method is invoked based on the autoWatermarkInterval
>>>> (or less to be conservative).
>>>>
>>>> You can check if the watermark has changed since the arrival of the
>>>> last event and if not increment it in the getCurrentWatermark() method.
>>>> Otherwise the watermark will never increase until an element arrive and if
>>>> the stream partition stalls for some reason the whole pipeline freezes.
>>>>
>>>> Sameer
>>>>
>>>>
>>>> On Tue, Oct 11, 2016 at 6:04 AM, Till Rohrmann <till.rohrmann@gmail.com
>>>> > wrote:
>>>>
>>>>> Hi David,
>>>>>
>>>>> the problem is still that there is no corresponding watermark saying
>>>>> that 4 seconds have now passed. With your code, watermarks will be
>>>>> periodically emitted but the same watermark will be emitted until a new
>>>>> element arrives which will reset the watermark. Thus, the system can never
>>>>> know until this watermark is seen whether there will be an earlier event or
>>>>> not. I fear that this is a fundamental problem with stream processing.
>>>>>
>>>>> You're right that the negation operator won't solve the problem. It
>>>>> will indeed suffer from the same problem.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Sun, Oct 9, 2016 at 7:37 PM, <lg...@yahoo.com> wrote:
>>>>>
>>>>>> >>FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP
>>>>>> "not" operator) does not address this because again, how would the "not
>>>>>> match" be triggered if no event at all occurs?
>>>>>>
>>>>>> Good question.
>>>>>>
>>>>>> I'm not sure whether the following will work:
>>>>>>
>>>>>> This could be done by creating a CEP matching pattern that uses both
>>>>>> of "notNext" (or "notFollowedBy") and "within" constructs. Something like
>>>>>> this:
>>>>>>
>>>>>> Pattern<Event, ?> pattern = Pattern.<Event>begin("first")
>>>>>>     .notNext("second")
>>>>>>     .within(Time.seconds(3));
>>>>>>
>>>>>> I'm hoping Flink CEP experts (Till?) will comment on this.
>>>>>>
>>>>>> Note: I have requested these negation patterns to be implemented in
>>>>>> Flink CEP, but notNext/notFollowedBy are not yet implemented in Flink..
>>>>>>
>>>>>>
>>>>>> - LF
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> ------------------------------
>>>>>> *From:* David Koch <og...@googlemail.com>
>>>>>> *To:* user@flink.apache.org; lgfmt@yahoo.com
>>>>>> *Sent:* Sunday, October 9, 2016 5:51 AM
>>>>>>
>>>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> Thank you for the explanation as well as the link to the other post.
>>>>>> Interesting to learn about some of the open JIRAs.
>>>>>>
>>>>>> Indeed, I was not using event time, but processing time. However,
>>>>>> even when using event time I only get notified of timeouts upon subsequent
>>>>>> events.
>>>>>>
>>>>>> The link <http://pastebin.com/x4m3RHQz> contains an example where I
>>>>>> read <key> <value> from a socket, wrap this in a custom "event" with
>>>>>> timestamp, key the resultant stream by <key> and attempt to detect <key>
>>>>>> instances no further than 3 seconds apart using CEP.
>>>>>>
>>>>>> Apart from the fact that results are only printed when I close the
>>>>>> socket (normal?) I don't observe any change in behaviour
>>>>>>
>>>>>> So event-time/watermarks or not: SOME event has to occur for the
>>>>>> timeout to be triggered.
>>>>>>
>>>>>> FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP
>>>>>> "not" operator) does not address this because again, how would the "not
>>>>>> match" be triggered if no event at all occurs?
>>>>>>
>>>>>> On Sat, Oct 8, 2016 at 12:50 AM, <lg...@yahoo.com> wrote:
>>>>>>
>>>>>> The following is a better link:
>>>>>>
>>>>>> http://mail-archives.apache. org/mod_mbox/flink-user/
>>>>>> 201609.mbox/%3CCAC27z% 3DOTtv7USYUm82bE43- DkoGfVC4UAWD6uQwwRgTsE5be8g%
>>>>>> 40mail.gmail.com%3E
>>>>>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOTtv7USYUm82bE43-DkoGfVC4UAWD6uQwwRgTsE5be8g%40mail.gmail.com%3E>
>>>>>>
>>>>>>
>>>>>> - LF
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> ------------------------------
>>>>>> *From:* "lgfmt@yahoo.com" <lg...@yahoo.com>
>>>>>> *To:* "user@flink.apache.org" <us...@flink.apache.org>
>>>>>> *Sent:* Friday, October 7, 2016 3:36 PM
>>>>>>
>>>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>>>>
>>>>>> Isn't the upcoming CEP negation (absence of an event) feature solve
>>>>>> this issue?
>>>>>>
>>>>>> See this discussion thread:
>>>>>> http://mail-archives.apache. org/mod_mbox/flink-user/
>>>>>> 201609.mbox/%3CCAC27z%3DOD% 2BTq8twBw_ 1YKni5sWAU3g1S9WDpJw0DUwgiG9YX
>>>>>> 9Fg%40mail.gmail.com%3E
>>>>>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOD%2BTq8twBw_1YKni5sWAU3g1S9WDpJw0DUwgiG9YX9Fg%40mail.gmail.com%3E>
>>>>>>
>>>>>>
>>>>>>
>>>>>> //  Atul
>>>>>>
>>>>>>
>>>>>> ------------------------------
>>>>>> *From:* Till Rohrmann <tr...@apache.org>
>>>>>> *To:* user@flink.apache.org
>>>>>> *Sent:* Friday, October 7, 2016 12:58 AM
>>>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>>>>
>>>>>> Hi David,
>>>>>>
>>>>>> in case of event time, the timeout will be detected when the first
>>>>>> watermark exceeding the timeout value is received. Thus, it depends a
>>>>>> little bit how you generate watermarks (e.g. periodically, watermark per
>>>>>> event).
>>>>>>
>>>>>> In case of processing time, the time is only updated whenever a new
>>>>>> element arrives. Thus, if you have an element arriving 4 seconds after
>>>>>> Event A, it should detect the timeout. If the next event arrives 20 seconds
>>>>>> later, than you won't see the timeout until then.
>>>>>>
>>>>>> In the case of processing time, we could think about registering
>>>>>> timeout timers for processing time. However, I would highly recommend you
>>>>>> to use event time, because with processing time, Flink cannot guarantee
>>>>>> meaningful computations, because the events might arrive out of order.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Thu, Oct 6, 2016 at 3:08 PM, David Koch <og...@googlemail.com>
>>>>>> wrote:
>>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> With Flink CEP, is there a way to actively listen to pattern matches
>>>>>> that time out? I am under the impression that this is not possible.
>>>>>>
>>>>>> In my case I partition a stream containing user web navigation by
>>>>>> "userId" to look for sequences of Event A, followed by B within 4 seconds
>>>>>> for each user.
>>>>>>
>>>>>> I registered a PatternTimeoutFunction which assuming a non-match only
>>>>>> fires upon the first event after the specified timeout. For example, given
>>>>>> user X: Event A, 20 seconds later Event B (or any other type of event).
>>>>>>
>>>>>> I'd rather have a notification fire directly upon the 4 second
>>>>>> interval expiring since passive invalidation is not really applicable in my
>>>>>> case.
>>>>>>
>>>>>> How, if at all can this be achieved with Flink CEP?
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> David
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Listening to timed-out patterns in Flink CEP

Posted by David Koch <og...@googlemail.com>.
Hello,

I tried setting the watermark to System.currentTimeMillis() - 5000L, event
timestamps are System.currentTimeMillis(). I do not observe the expected
behaviour of the PatternTimeoutFunction firing once the watermark moves
past the timeout "anchored" by a pattern match.

Here is the complete test class source <http://pastebin.com/9WxGq2wv>, in
case someone is interested. The timestamp/watermark assigner looks like
this:

DataStream<Event> withTimestampsAndWatermarks = tuples
        .assignTimestampsAndWatermarks(new
AssignerWithPeriodicWatermarks<Event>() {

            long waterMarkTmst;

            @Override
            public long extractTimestamp(Event element, long
previousElementTimestamp) {
                return element.tmst;
            }

            @Override
            public Watermark getCurrentWatermark() {
                waterMarkTmst = System.currentTimeMillis() - 5000L;
                System.out.println(String.format("Watermark at %s", new
Date(waterMarkTmst)));
                return new Watermark(waterMarkTmst);
            }
        }).keyBy("key");

withTimestampsAndWatermarks.getExecutionConfig().setAutoWatermarkInterval(1000L);

// Apply pattern filtering on stream.
PatternStream<Event> patternStream =
CEP.pattern(withTimestampsAndWatermarks, pattern);

Any idea what's wrong?

David


On Tue, Oct 11, 2016 at 10:20 PM, Sameer W <sa...@axiomine.com> wrote:

> Assuming an element with timestamp which is later than the last emitted
> watermark arrives, would it just be dropped because the PatternStream does
> not have a max allowed lateness method? In that case it appears that CEP
> cannot handle late events yet out of the box.
>
> If we do want to support late events can we chain a keyBy().timeWindow().
> allowedLateness(x).map().assignTimestampsAndWatermarks().keyBy() again
> before handing it to the CEP operator. This way we may have the patterns
> fired multiple times but it allows an event to be late and out of order. It
> looks like it will work but is there a less convoluted way.
>
> Thanks,
> Sameer
>
> On Tue, Oct 11, 2016 at 12:17 PM, Till Rohrmann <ti...@gmail.com>
> wrote:
>
>> But then no element later than the last emitted watermark must be issued
>> by the sources. If that is the case, then this solution should work.
>>
>> Cheers,
>> Till
>>
>> On Tue, Oct 11, 2016 at 4:50 PM, Sameer W <sa...@axiomine.com> wrote:
>>
>>> Hi,
>>>
>>> If you know that the events are arriving in order and a consistent lag,
>>> why not just increment the watermark time every time the
>>> getCurrentWatermark() method is invoked based on the autoWatermarkInterval
>>> (or less to be conservative).
>>>
>>> You can check if the watermark has changed since the arrival of the last
>>> event and if not increment it in the getCurrentWatermark() method.
>>> Otherwise the watermark will never increase until an element arrive and if
>>> the stream partition stalls for some reason the whole pipeline freezes.
>>>
>>> Sameer
>>>
>>>
>>> On Tue, Oct 11, 2016 at 6:04 AM, Till Rohrmann <ti...@gmail.com>
>>> wrote:
>>>
>>>> Hi David,
>>>>
>>>> the problem is still that there is no corresponding watermark saying
>>>> that 4 seconds have now passed. With your code, watermarks will be
>>>> periodically emitted but the same watermark will be emitted until a new
>>>> element arrives which will reset the watermark. Thus, the system can never
>>>> know until this watermark is seen whether there will be an earlier event or
>>>> not. I fear that this is a fundamental problem with stream processing.
>>>>
>>>> You're right that the negation operator won't solve the problem. It
>>>> will indeed suffer from the same problem.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Sun, Oct 9, 2016 at 7:37 PM, <lg...@yahoo.com> wrote:
>>>>
>>>>> >>FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP
>>>>> "not" operator) does not address this because again, how would the "not
>>>>> match" be triggered if no event at all occurs?
>>>>>
>>>>> Good question.
>>>>>
>>>>> I'm not sure whether the following will work:
>>>>>
>>>>> This could be done by creating a CEP matching pattern that uses both
>>>>> of "notNext" (or "notFollowedBy") and "within" constructs. Something like
>>>>> this:
>>>>>
>>>>> Pattern<Event, ?> pattern = Pattern.<Event>begin("first")
>>>>>     .notNext("second")
>>>>>     .within(Time.seconds(3));
>>>>>
>>>>> I'm hoping Flink CEP experts (Till?) will comment on this.
>>>>>
>>>>> Note: I have requested these negation patterns to be implemented in
>>>>> Flink CEP, but notNext/notFollowedBy are not yet implemented in Flink..
>>>>>
>>>>>
>>>>> - LF
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> ------------------------------
>>>>> *From:* David Koch <og...@googlemail.com>
>>>>> *To:* user@flink.apache.org; lgfmt@yahoo.com
>>>>> *Sent:* Sunday, October 9, 2016 5:51 AM
>>>>>
>>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>>>
>>>>> Hello,
>>>>>
>>>>> Thank you for the explanation as well as the link to the other post.
>>>>> Interesting to learn about some of the open JIRAs.
>>>>>
>>>>> Indeed, I was not using event time, but processing time. However, even
>>>>> when using event time I only get notified of timeouts upon subsequent
>>>>> events.
>>>>>
>>>>> The link <http://pastebin.com/x4m3RHQz> contains an example where I
>>>>> read <key> <value> from a socket, wrap this in a custom "event" with
>>>>> timestamp, key the resultant stream by <key> and attempt to detect <key>
>>>>> instances no further than 3 seconds apart using CEP.
>>>>>
>>>>> Apart from the fact that results are only printed when I close the
>>>>> socket (normal?) I don't observe any change in behaviour
>>>>>
>>>>> So event-time/watermarks or not: SOME event has to occur for the
>>>>> timeout to be triggered.
>>>>>
>>>>> FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP
>>>>> "not" operator) does not address this because again, how would the "not
>>>>> match" be triggered if no event at all occurs?
>>>>>
>>>>> On Sat, Oct 8, 2016 at 12:50 AM, <lg...@yahoo.com> wrote:
>>>>>
>>>>> The following is a better link:
>>>>>
>>>>> http://mail-archives.apache. org/mod_mbox/flink-user/
>>>>> 201609.mbox/%3CCAC27z% 3DOTtv7USYUm82bE43- DkoGfVC4UAWD6uQwwRgTsE5be8g%
>>>>> 40mail.gmail.com%3E
>>>>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOTtv7USYUm82bE43-DkoGfVC4UAWD6uQwwRgTsE5be8g%40mail.gmail.com%3E>
>>>>>
>>>>>
>>>>> - LF
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> ------------------------------
>>>>> *From:* "lgfmt@yahoo.com" <lg...@yahoo.com>
>>>>> *To:* "user@flink.apache.org" <us...@flink.apache.org>
>>>>> *Sent:* Friday, October 7, 2016 3:36 PM
>>>>>
>>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>>>
>>>>> Isn't the upcoming CEP negation (absence of an event) feature solve
>>>>> this issue?
>>>>>
>>>>> See this discussion thread:
>>>>> http://mail-archives.apache. org/mod_mbox/flink-user/
>>>>> 201609.mbox/%3CCAC27z%3DOD% 2BTq8twBw_ 1YKni5sWAU3g1S9WDpJw0DUwgiG9YX
>>>>> 9Fg%40mail.gmail.com%3E
>>>>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOD%2BTq8twBw_1YKni5sWAU3g1S9WDpJw0DUwgiG9YX9Fg%40mail.gmail.com%3E>
>>>>>
>>>>>
>>>>>
>>>>> //  Atul
>>>>>
>>>>>
>>>>> ------------------------------
>>>>> *From:* Till Rohrmann <tr...@apache.org>
>>>>> *To:* user@flink.apache.org
>>>>> *Sent:* Friday, October 7, 2016 12:58 AM
>>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>>>
>>>>> Hi David,
>>>>>
>>>>> in case of event time, the timeout will be detected when the first
>>>>> watermark exceeding the timeout value is received. Thus, it depends a
>>>>> little bit how you generate watermarks (e.g. periodically, watermark per
>>>>> event).
>>>>>
>>>>> In case of processing time, the time is only updated whenever a new
>>>>> element arrives. Thus, if you have an element arriving 4 seconds after
>>>>> Event A, it should detect the timeout. If the next event arrives 20 seconds
>>>>> later, than you won't see the timeout until then.
>>>>>
>>>>> In the case of processing time, we could think about registering
>>>>> timeout timers for processing time. However, I would highly recommend you
>>>>> to use event time, because with processing time, Flink cannot guarantee
>>>>> meaningful computations, because the events might arrive out of order.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Thu, Oct 6, 2016 at 3:08 PM, David Koch <og...@googlemail.com>
>>>>> wrote:
>>>>>
>>>>> Hello,
>>>>>
>>>>> With Flink CEP, is there a way to actively listen to pattern matches
>>>>> that time out? I am under the impression that this is not possible.
>>>>>
>>>>> In my case I partition a stream containing user web navigation by
>>>>> "userId" to look for sequences of Event A, followed by B within 4 seconds
>>>>> for each user.
>>>>>
>>>>> I registered a PatternTimeoutFunction which assuming a non-match only
>>>>> fires upon the first event after the specified timeout. For example, given
>>>>> user X: Event A, 20 seconds later Event B (or any other type of event).
>>>>>
>>>>> I'd rather have a notification fire directly upon the 4 second
>>>>> interval expiring since passive invalidation is not really applicable in my
>>>>> case.
>>>>>
>>>>> How, if at all can this be achieved with Flink CEP?
>>>>>
>>>>> Thanks,
>>>>>
>>>>> David
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Listening to timed-out patterns in Flink CEP

Posted by Sameer W <sa...@axiomine.com>.
Assuming an element with timestamp which is later than the last emitted
watermark arrives, would it just be dropped because the PatternStream does
not have a max allowed lateness method? In that case it appears that CEP
cannot handle late events yet out of the box.

If we do want to support late events can we chain a
keyBy().timeWindow().allowedLateness(x).map().assignTimestampsAndWatermarks().keyBy()
again before handing it to the CEP operator. This way we may have the
patterns fired multiple times but it allows an event to be late and out of
order. It looks like it will work but is there a less convoluted way.

Thanks,
Sameer

On Tue, Oct 11, 2016 at 12:17 PM, Till Rohrmann <ti...@gmail.com>
wrote:

> But then no element later than the last emitted watermark must be issued
> by the sources. If that is the case, then this solution should work.
>
> Cheers,
> Till
>
> On Tue, Oct 11, 2016 at 4:50 PM, Sameer W <sa...@axiomine.com> wrote:
>
>> Hi,
>>
>> If you know that the events are arriving in order and a consistent lag,
>> why not just increment the watermark time every time the
>> getCurrentWatermark() method is invoked based on the autoWatermarkInterval
>> (or less to be conservative).
>>
>> You can check if the watermark has changed since the arrival of the last
>> event and if not increment it in the getCurrentWatermark() method.
>> Otherwise the watermark will never increase until an element arrive and if
>> the stream partition stalls for some reason the whole pipeline freezes.
>>
>> Sameer
>>
>>
>> On Tue, Oct 11, 2016 at 6:04 AM, Till Rohrmann <ti...@gmail.com>
>> wrote:
>>
>>> Hi David,
>>>
>>> the problem is still that there is no corresponding watermark saying
>>> that 4 seconds have now passed. With your code, watermarks will be
>>> periodically emitted but the same watermark will be emitted until a new
>>> element arrives which will reset the watermark. Thus, the system can never
>>> know until this watermark is seen whether there will be an earlier event or
>>> not. I fear that this is a fundamental problem with stream processing.
>>>
>>> You're right that the negation operator won't solve the problem. It will
>>> indeed suffer from the same problem.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Sun, Oct 9, 2016 at 7:37 PM, <lg...@yahoo.com> wrote:
>>>
>>>> >>FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP
>>>> "not" operator) does not address this because again, how would the "not
>>>> match" be triggered if no event at all occurs?
>>>>
>>>> Good question.
>>>>
>>>> I'm not sure whether the following will work:
>>>>
>>>> This could be done by creating a CEP matching pattern that uses both of
>>>> "notNext" (or "notFollowedBy") and "within" constructs. Something like this:
>>>>
>>>> Pattern<Event, ?> pattern = Pattern.<Event>begin("first")
>>>>     .notNext("second")
>>>>     .within(Time.seconds(3));
>>>>
>>>> I'm hoping Flink CEP experts (Till?) will comment on this.
>>>>
>>>> Note: I have requested these negation patterns to be implemented in
>>>> Flink CEP, but notNext/notFollowedBy are not yet implemented in Flink..
>>>>
>>>>
>>>> - LF
>>>>
>>>>
>>>>
>>>>
>>>> ------------------------------
>>>> *From:* David Koch <og...@googlemail.com>
>>>> *To:* user@flink.apache.org; lgfmt@yahoo.com
>>>> *Sent:* Sunday, October 9, 2016 5:51 AM
>>>>
>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>>
>>>> Hello,
>>>>
>>>> Thank you for the explanation as well as the link to the other post.
>>>> Interesting to learn about some of the open JIRAs.
>>>>
>>>> Indeed, I was not using event time, but processing time. However, even
>>>> when using event time I only get notified of timeouts upon subsequent
>>>> events.
>>>>
>>>> The link <http://pastebin.com/x4m3RHQz> contains an example where I
>>>> read <key> <value> from a socket, wrap this in a custom "event" with
>>>> timestamp, key the resultant stream by <key> and attempt to detect <key>
>>>> instances no further than 3 seconds apart using CEP.
>>>>
>>>> Apart from the fact that results are only printed when I close the
>>>> socket (normal?) I don't observe any change in behaviour
>>>>
>>>> So event-time/watermarks or not: SOME event has to occur for the
>>>> timeout to be triggered.
>>>>
>>>> FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP
>>>> "not" operator) does not address this because again, how would the "not
>>>> match" be triggered if no event at all occurs?
>>>>
>>>> On Sat, Oct 8, 2016 at 12:50 AM, <lg...@yahoo.com> wrote:
>>>>
>>>> The following is a better link:
>>>>
>>>> http://mail-archives.apache. org/mod_mbox/flink-user/
>>>> 201609.mbox/%3CCAC27z% 3DOTtv7USYUm82bE43- DkoGfVC4UAWD6uQwwRgTsE5be8g%
>>>> 40mail.gmail.com%3E
>>>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOTtv7USYUm82bE43-DkoGfVC4UAWD6uQwwRgTsE5be8g%40mail.gmail.com%3E>
>>>>
>>>>
>>>> - LF
>>>>
>>>>
>>>>
>>>>
>>>> ------------------------------
>>>> *From:* "lgfmt@yahoo.com" <lg...@yahoo.com>
>>>> *To:* "user@flink.apache.org" <us...@flink.apache.org>
>>>> *Sent:* Friday, October 7, 2016 3:36 PM
>>>>
>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>>
>>>> Isn't the upcoming CEP negation (absence of an event) feature solve
>>>> this issue?
>>>>
>>>> See this discussion thread:
>>>> http://mail-archives.apache. org/mod_mbox/flink-user/
>>>> 201609.mbox/%3CCAC27z%3DOD% 2BTq8twBw_ 1YKni5sWAU3g1S9WDpJw0DUwgiG9YX
>>>> 9Fg%40mail.gmail.com%3E
>>>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOD%2BTq8twBw_1YKni5sWAU3g1S9WDpJw0DUwgiG9YX9Fg%40mail.gmail.com%3E>
>>>>
>>>>
>>>>
>>>> //  Atul
>>>>
>>>>
>>>> ------------------------------
>>>> *From:* Till Rohrmann <tr...@apache.org>
>>>> *To:* user@flink.apache.org
>>>> *Sent:* Friday, October 7, 2016 12:58 AM
>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>>
>>>> Hi David,
>>>>
>>>> in case of event time, the timeout will be detected when the first
>>>> watermark exceeding the timeout value is received. Thus, it depends a
>>>> little bit how you generate watermarks (e.g. periodically, watermark per
>>>> event).
>>>>
>>>> In case of processing time, the time is only updated whenever a new
>>>> element arrives. Thus, if you have an element arriving 4 seconds after
>>>> Event A, it should detect the timeout. If the next event arrives 20 seconds
>>>> later, than you won't see the timeout until then.
>>>>
>>>> In the case of processing time, we could think about registering
>>>> timeout timers for processing time. However, I would highly recommend you
>>>> to use event time, because with processing time, Flink cannot guarantee
>>>> meaningful computations, because the events might arrive out of order.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Thu, Oct 6, 2016 at 3:08 PM, David Koch <og...@googlemail.com>
>>>> wrote:
>>>>
>>>> Hello,
>>>>
>>>> With Flink CEP, is there a way to actively listen to pattern matches
>>>> that time out? I am under the impression that this is not possible.
>>>>
>>>> In my case I partition a stream containing user web navigation by
>>>> "userId" to look for sequences of Event A, followed by B within 4 seconds
>>>> for each user.
>>>>
>>>> I registered a PatternTimeoutFunction which assuming a non-match only
>>>> fires upon the first event after the specified timeout. For example, given
>>>> user X: Event A, 20 seconds later Event B (or any other type of event).
>>>>
>>>> I'd rather have a notification fire directly upon the 4 second interval
>>>> expiring since passive invalidation is not really applicable in my case.
>>>>
>>>> How, if at all can this be achieved with Flink CEP?
>>>>
>>>> Thanks,
>>>>
>>>> David
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>
>

Re: Listening to timed-out patterns in Flink CEP

Posted by David Koch <og...@googlemail.com>.
I will give it a try, my current time/watermark assigner extends
AscendingTimestampExtractor so I can't override setting the watermark to
the last seen event timestamp.

Thanks for your replies.

/David

On Tue, Oct 11, 2016 at 6:17 PM, Till Rohrmann <ti...@gmail.com>
wrote:

> But then no element later than the last emitted watermark must be issued
> by the sources. If that is the case, then this solution should work.
>
> Cheers,
> Till
>
> On Tue, Oct 11, 2016 at 4:50 PM, Sameer W <sa...@axiomine.com> wrote:
>
>> Hi,
>>
>> If you know that the events are arriving in order and a consistent lag,
>> why not just increment the watermark time every time the
>> getCurrentWatermark() method is invoked based on the autoWatermarkInterval
>> (or less to be conservative).
>>
>> You can check if the watermark has changed since the arrival of the last
>> event and if not increment it in the getCurrentWatermark() method.
>> Otherwise the watermark will never increase until an element arrive and if
>> the stream partition stalls for some reason the whole pipeline freezes.
>>
>> Sameer
>>
>>
>> On Tue, Oct 11, 2016 at 6:04 AM, Till Rohrmann <ti...@gmail.com>
>> wrote:
>>
>>> Hi David,
>>>
>>> the problem is still that there is no corresponding watermark saying
>>> that 4 seconds have now passed. With your code, watermarks will be
>>> periodically emitted but the same watermark will be emitted until a new
>>> element arrives which will reset the watermark. Thus, the system can never
>>> know until this watermark is seen whether there will be an earlier event or
>>> not. I fear that this is a fundamental problem with stream processing.
>>>
>>> You're right that the negation operator won't solve the problem. It will
>>> indeed suffer from the same problem.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Sun, Oct 9, 2016 at 7:37 PM, <lg...@yahoo.com> wrote:
>>>
>>>> >>FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP
>>>> "not" operator) does not address this because again, how would the "not
>>>> match" be triggered if no event at all occurs?
>>>>
>>>> Good question.
>>>>
>>>> I'm not sure whether the following will work:
>>>>
>>>> This could be done by creating a CEP matching pattern that uses both of
>>>> "notNext" (or "notFollowedBy") and "within" constructs. Something like this:
>>>>
>>>> Pattern<Event, ?> pattern = Pattern.<Event>begin("first")
>>>>     .notNext("second")
>>>>     .within(Time.seconds(3));
>>>>
>>>> I'm hoping Flink CEP experts (Till?) will comment on this.
>>>>
>>>> Note: I have requested these negation patterns to be implemented in
>>>> Flink CEP, but notNext/notFollowedBy are not yet implemented in Flink..
>>>>
>>>>
>>>> - LF
>>>>
>>>>
>>>>
>>>>
>>>> ------------------------------
>>>> *From:* David Koch <og...@googlemail.com>
>>>> *To:* user@flink.apache.org; lgfmt@yahoo.com
>>>> *Sent:* Sunday, October 9, 2016 5:51 AM
>>>>
>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>>
>>>> Hello,
>>>>
>>>> Thank you for the explanation as well as the link to the other post.
>>>> Interesting to learn about some of the open JIRAs.
>>>>
>>>> Indeed, I was not using event time, but processing time. However, even
>>>> when using event time I only get notified of timeouts upon subsequent
>>>> events.
>>>>
>>>> The link <http://pastebin.com/x4m3RHQz> contains an example where I
>>>> read <key> <value> from a socket, wrap this in a custom "event" with
>>>> timestamp, key the resultant stream by <key> and attempt to detect <key>
>>>> instances no further than 3 seconds apart using CEP.
>>>>
>>>> Apart from the fact that results are only printed when I close the
>>>> socket (normal?) I don't observe any change in behaviour
>>>>
>>>> So event-time/watermarks or not: SOME event has to occur for the
>>>> timeout to be triggered.
>>>>
>>>> FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP
>>>> "not" operator) does not address this because again, how would the "not
>>>> match" be triggered if no event at all occurs?
>>>>
>>>> On Sat, Oct 8, 2016 at 12:50 AM, <lg...@yahoo.com> wrote:
>>>>
>>>> The following is a better link:
>>>>
>>>> http://mail-archives.apache. org/mod_mbox/flink-user/
>>>> 201609.mbox/%3CCAC27z% 3DOTtv7USYUm82bE43- DkoGfVC4UAWD6uQwwRgTsE5be8g%
>>>> 40mail.gmail.com%3E
>>>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOTtv7USYUm82bE43-DkoGfVC4UAWD6uQwwRgTsE5be8g%40mail.gmail.com%3E>
>>>>
>>>>
>>>> - LF
>>>>
>>>>
>>>>
>>>>
>>>> ------------------------------
>>>> *From:* "lgfmt@yahoo.com" <lg...@yahoo.com>
>>>> *To:* "user@flink.apache.org" <us...@flink.apache.org>
>>>> *Sent:* Friday, October 7, 2016 3:36 PM
>>>>
>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>>
>>>> Isn't the upcoming CEP negation (absence of an event) feature solve
>>>> this issue?
>>>>
>>>> See this discussion thread:
>>>> http://mail-archives.apache. org/mod_mbox/flink-user/
>>>> 201609.mbox/%3CCAC27z%3DOD% 2BTq8twBw_ 1YKni5sWAU3g1S9WDpJw0DUwgiG9YX
>>>> 9Fg%40mail.gmail.com%3E
>>>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOD%2BTq8twBw_1YKni5sWAU3g1S9WDpJw0DUwgiG9YX9Fg%40mail.gmail.com%3E>
>>>>
>>>>
>>>>
>>>> //  Atul
>>>>
>>>>
>>>> ------------------------------
>>>> *From:* Till Rohrmann <tr...@apache.org>
>>>> *To:* user@flink.apache.org
>>>> *Sent:* Friday, October 7, 2016 12:58 AM
>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>>
>>>> Hi David,
>>>>
>>>> in case of event time, the timeout will be detected when the first
>>>> watermark exceeding the timeout value is received. Thus, it depends a
>>>> little bit how you generate watermarks (e.g. periodically, watermark per
>>>> event).
>>>>
>>>> In case of processing time, the time is only updated whenever a new
>>>> element arrives. Thus, if you have an element arriving 4 seconds after
>>>> Event A, it should detect the timeout. If the next event arrives 20 seconds
>>>> later, than you won't see the timeout until then.
>>>>
>>>> In the case of processing time, we could think about registering
>>>> timeout timers for processing time. However, I would highly recommend you
>>>> to use event time, because with processing time, Flink cannot guarantee
>>>> meaningful computations, because the events might arrive out of order.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Thu, Oct 6, 2016 at 3:08 PM, David Koch <og...@googlemail.com>
>>>> wrote:
>>>>
>>>> Hello,
>>>>
>>>> With Flink CEP, is there a way to actively listen to pattern matches
>>>> that time out? I am under the impression that this is not possible.
>>>>
>>>> In my case I partition a stream containing user web navigation by
>>>> "userId" to look for sequences of Event A, followed by B within 4 seconds
>>>> for each user.
>>>>
>>>> I registered a PatternTimeoutFunction which assuming a non-match only
>>>> fires upon the first event after the specified timeout. For example, given
>>>> user X: Event A, 20 seconds later Event B (or any other type of event).
>>>>
>>>> I'd rather have a notification fire directly upon the 4 second interval
>>>> expiring since passive invalidation is not really applicable in my case.
>>>>
>>>> How, if at all can this be achieved with Flink CEP?
>>>>
>>>> Thanks,
>>>>
>>>> David
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>
>

Re: Listening to timed-out patterns in Flink CEP

Posted by Till Rohrmann <ti...@gmail.com>.
But then no element later than the last emitted watermark must be issued by
the sources. If that is the case, then this solution should work.

Cheers,
Till

On Tue, Oct 11, 2016 at 4:50 PM, Sameer W <sa...@axiomine.com> wrote:

> Hi,
>
> If you know that the events are arriving in order and a consistent lag,
> why not just increment the watermark time every time the
> getCurrentWatermark() method is invoked based on the autoWatermarkInterval
> (or less to be conservative).
>
> You can check if the watermark has changed since the arrival of the last
> event and if not increment it in the getCurrentWatermark() method.
> Otherwise the watermark will never increase until an element arrive and if
> the stream partition stalls for some reason the whole pipeline freezes.
>
> Sameer
>
>
> On Tue, Oct 11, 2016 at 6:04 AM, Till Rohrmann <ti...@gmail.com>
> wrote:
>
>> Hi David,
>>
>> the problem is still that there is no corresponding watermark saying that
>> 4 seconds have now passed. With your code, watermarks will be periodically
>> emitted but the same watermark will be emitted until a new element arrives
>> which will reset the watermark. Thus, the system can never know until this
>> watermark is seen whether there will be an earlier event or not. I fear
>> that this is a fundamental problem with stream processing.
>>
>> You're right that the negation operator won't solve the problem. It will
>> indeed suffer from the same problem.
>>
>> Cheers,
>> Till
>>
>> On Sun, Oct 9, 2016 at 7:37 PM, <lg...@yahoo.com> wrote:
>>
>>> >>FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP
>>> "not" operator) does not address this because again, how would the "not
>>> match" be triggered if no event at all occurs?
>>>
>>> Good question.
>>>
>>> I'm not sure whether the following will work:
>>>
>>> This could be done by creating a CEP matching pattern that uses both of
>>> "notNext" (or "notFollowedBy") and "within" constructs. Something like this:
>>>
>>> Pattern<Event, ?> pattern = Pattern.<Event>begin("first")
>>>     .notNext("second")
>>>     .within(Time.seconds(3));
>>>
>>> I'm hoping Flink CEP experts (Till?) will comment on this.
>>>
>>> Note: I have requested these negation patterns to be implemented in
>>> Flink CEP, but notNext/notFollowedBy are not yet implemented in Flink..
>>>
>>>
>>> - LF
>>>
>>>
>>>
>>>
>>> ------------------------------
>>> *From:* David Koch <og...@googlemail.com>
>>> *To:* user@flink.apache.org; lgfmt@yahoo.com
>>> *Sent:* Sunday, October 9, 2016 5:51 AM
>>>
>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>
>>> Hello,
>>>
>>> Thank you for the explanation as well as the link to the other post.
>>> Interesting to learn about some of the open JIRAs.
>>>
>>> Indeed, I was not using event time, but processing time. However, even
>>> when using event time I only get notified of timeouts upon subsequent
>>> events.
>>>
>>> The link <http://pastebin.com/x4m3RHQz> contains an example where I
>>> read <key> <value> from a socket, wrap this in a custom "event" with
>>> timestamp, key the resultant stream by <key> and attempt to detect <key>
>>> instances no further than 3 seconds apart using CEP.
>>>
>>> Apart from the fact that results are only printed when I close the
>>> socket (normal?) I don't observe any change in behaviour
>>>
>>> So event-time/watermarks or not: SOME event has to occur for the timeout
>>> to be triggered.
>>>
>>> FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP
>>> "not" operator) does not address this because again, how would the "not
>>> match" be triggered if no event at all occurs?
>>>
>>> On Sat, Oct 8, 2016 at 12:50 AM, <lg...@yahoo.com> wrote:
>>>
>>> The following is a better link:
>>>
>>> http://mail-archives.apache. org/mod_mbox/flink-user/
>>> 201609.mbox/%3CCAC27z% 3DOTtv7USYUm82bE43- DkoGfVC4UAWD6uQwwRgTsE5be8g%
>>> 40mail.gmail.com%3E
>>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOTtv7USYUm82bE43-DkoGfVC4UAWD6uQwwRgTsE5be8g%40mail.gmail.com%3E>
>>>
>>>
>>> - LF
>>>
>>>
>>>
>>>
>>> ------------------------------
>>> *From:* "lgfmt@yahoo.com" <lg...@yahoo.com>
>>> *To:* "user@flink.apache.org" <us...@flink.apache.org>
>>> *Sent:* Friday, October 7, 2016 3:36 PM
>>>
>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>
>>> Isn't the upcoming CEP negation (absence of an event) feature solve
>>> this issue?
>>>
>>> See this discussion thread:
>>> http://mail-archives.apache. org/mod_mbox/flink-user/
>>> 201609.mbox/%3CCAC27z%3DOD% 2BTq8twBw_ 1YKni5sWAU3g1S9WDpJw0DUwgiG9YX
>>> 9Fg%40mail.gmail.com%3E
>>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOD%2BTq8twBw_1YKni5sWAU3g1S9WDpJw0DUwgiG9YX9Fg%40mail.gmail.com%3E>
>>>
>>>
>>>
>>> //  Atul
>>>
>>>
>>> ------------------------------
>>> *From:* Till Rohrmann <tr...@apache.org>
>>> *To:* user@flink.apache.org
>>> *Sent:* Friday, October 7, 2016 12:58 AM
>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>
>>> Hi David,
>>>
>>> in case of event time, the timeout will be detected when the first
>>> watermark exceeding the timeout value is received. Thus, it depends a
>>> little bit how you generate watermarks (e.g. periodically, watermark per
>>> event).
>>>
>>> In case of processing time, the time is only updated whenever a new
>>> element arrives. Thus, if you have an element arriving 4 seconds after
>>> Event A, it should detect the timeout. If the next event arrives 20 seconds
>>> later, than you won't see the timeout until then.
>>>
>>> In the case of processing time, we could think about registering timeout
>>> timers for processing time. However, I would highly recommend you to use
>>> event time, because with processing time, Flink cannot guarantee meaningful
>>> computations, because the events might arrive out of order.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Oct 6, 2016 at 3:08 PM, David Koch <og...@googlemail.com>
>>> wrote:
>>>
>>> Hello,
>>>
>>> With Flink CEP, is there a way to actively listen to pattern matches
>>> that time out? I am under the impression that this is not possible.
>>>
>>> In my case I partition a stream containing user web navigation by
>>> "userId" to look for sequences of Event A, followed by B within 4 seconds
>>> for each user.
>>>
>>> I registered a PatternTimeoutFunction which assuming a non-match only
>>> fires upon the first event after the specified timeout. For example, given
>>> user X: Event A, 20 seconds later Event B (or any other type of event).
>>>
>>> I'd rather have a notification fire directly upon the 4 second interval
>>> expiring since passive invalidation is not really applicable in my case.
>>>
>>> How, if at all can this be achieved with Flink CEP?
>>>
>>> Thanks,
>>>
>>> David
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>

Re: Listening to timed-out patterns in Flink CEP

Posted by Sameer W <sa...@axiomine.com>.
Hi,

If you know that the events are arriving in order and a consistent lag, why
not just increment the watermark time every time the getCurrentWatermark()
method is invoked based on the autoWatermarkInterval (or less to be
conservative).

You can check if the watermark has changed since the arrival of the last
event and if not increment it in the getCurrentWatermark() method.
Otherwise the watermark will never increase until an element arrive and if
the stream partition stalls for some reason the whole pipeline freezes.

Sameer


On Tue, Oct 11, 2016 at 6:04 AM, Till Rohrmann <ti...@gmail.com>
wrote:

> Hi David,
>
> the problem is still that there is no corresponding watermark saying that
> 4 seconds have now passed. With your code, watermarks will be periodically
> emitted but the same watermark will be emitted until a new element arrives
> which will reset the watermark. Thus, the system can never know until this
> watermark is seen whether there will be an earlier event or not. I fear
> that this is a fundamental problem with stream processing.
>
> You're right that the negation operator won't solve the problem. It will
> indeed suffer from the same problem.
>
> Cheers,
> Till
>
> On Sun, Oct 9, 2016 at 7:37 PM, <lg...@yahoo.com> wrote:
>
>> >>FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP
>> "not" operator) does not address this because again, how would the "not
>> match" be triggered if no event at all occurs?
>>
>> Good question.
>>
>> I'm not sure whether the following will work:
>>
>> This could be done by creating a CEP matching pattern that uses both of
>> "notNext" (or "notFollowedBy") and "within" constructs. Something like this:
>>
>> Pattern<Event, ?> pattern = Pattern.<Event>begin("first")
>>     .notNext("second")
>>     .within(Time.seconds(3));
>>
>> I'm hoping Flink CEP experts (Till?) will comment on this.
>>
>> Note: I have requested these negation patterns to be implemented in Flink
>> CEP, but notNext/notFollowedBy are not yet implemented in Flink..
>>
>>
>> - LF
>>
>>
>>
>>
>> ------------------------------
>> *From:* David Koch <og...@googlemail.com>
>> *To:* user@flink.apache.org; lgfmt@yahoo.com
>> *Sent:* Sunday, October 9, 2016 5:51 AM
>>
>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>
>> Hello,
>>
>> Thank you for the explanation as well as the link to the other post.
>> Interesting to learn about some of the open JIRAs.
>>
>> Indeed, I was not using event time, but processing time. However, even
>> when using event time I only get notified of timeouts upon subsequent
>> events.
>>
>> The link <http://pastebin.com/x4m3RHQz> contains an example where I read
>> <key> <value> from a socket, wrap this in a custom "event" with timestamp,
>> key the resultant stream by <key> and attempt to detect <key> instances no
>> further than 3 seconds apart using CEP.
>>
>> Apart from the fact that results are only printed when I close the socket
>> (normal?) I don't observe any change in behaviour
>>
>> So event-time/watermarks or not: SOME event has to occur for the timeout
>> to be triggered.
>>
>> FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP "not"
>> operator) does not address this because again, how would the "not match" be
>> triggered if no event at all occurs?
>>
>> On Sat, Oct 8, 2016 at 12:50 AM, <lg...@yahoo.com> wrote:
>>
>> The following is a better link:
>>
>> http://mail-archives.apache. org/mod_mbox/flink-user/
>> 201609.mbox/%3CCAC27z% 3DOTtv7USYUm82bE43- DkoGfVC4UAWD6uQwwRgTsE5be8g%
>> 40mail.gmail.com%3E
>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOTtv7USYUm82bE43-DkoGfVC4UAWD6uQwwRgTsE5be8g%40mail.gmail.com%3E>
>>
>>
>> - LF
>>
>>
>>
>>
>> ------------------------------
>> *From:* "lgfmt@yahoo.com" <lg...@yahoo.com>
>> *To:* "user@flink.apache.org" <us...@flink.apache.org>
>> *Sent:* Friday, October 7, 2016 3:36 PM
>>
>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>
>> Isn't the upcoming CEP negation (absence of an event) feature solve this
>> issue?
>>
>> See this discussion thread:
>> http://mail-archives.apache. org/mod_mbox/flink-user/
>> 201609.mbox/%3CCAC27z%3DOD% 2BTq8twBw_ 1YKni5sWAU3g1S9WDpJw0DUwgiG9YX
>> 9Fg%40mail.gmail.com%3E
>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOD%2BTq8twBw_1YKni5sWAU3g1S9WDpJw0DUwgiG9YX9Fg%40mail.gmail.com%3E>
>>
>>
>>
>> //  Atul
>>
>>
>> ------------------------------
>> *From:* Till Rohrmann <tr...@apache.org>
>> *To:* user@flink.apache.org
>> *Sent:* Friday, October 7, 2016 12:58 AM
>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>
>> Hi David,
>>
>> in case of event time, the timeout will be detected when the first
>> watermark exceeding the timeout value is received. Thus, it depends a
>> little bit how you generate watermarks (e.g. periodically, watermark per
>> event).
>>
>> In case of processing time, the time is only updated whenever a new
>> element arrives. Thus, if you have an element arriving 4 seconds after
>> Event A, it should detect the timeout. If the next event arrives 20 seconds
>> later, than you won't see the timeout until then.
>>
>> In the case of processing time, we could think about registering timeout
>> timers for processing time. However, I would highly recommend you to use
>> event time, because with processing time, Flink cannot guarantee meaningful
>> computations, because the events might arrive out of order.
>>
>> Cheers,
>> Till
>>
>> On Thu, Oct 6, 2016 at 3:08 PM, David Koch <og...@googlemail.com> wrote:
>>
>> Hello,
>>
>> With Flink CEP, is there a way to actively listen to pattern matches that
>> time out? I am under the impression that this is not possible.
>>
>> In my case I partition a stream containing user web navigation by
>> "userId" to look for sequences of Event A, followed by B within 4 seconds
>> for each user.
>>
>> I registered a PatternTimeoutFunction which assuming a non-match only
>> fires upon the first event after the specified timeout. For example, given
>> user X: Event A, 20 seconds later Event B (or any other type of event).
>>
>> I'd rather have a notification fire directly upon the 4 second interval
>> expiring since passive invalidation is not really applicable in my case.
>>
>> How, if at all can this be achieved with Flink CEP?
>>
>> Thanks,
>>
>> David
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>

Re: Listening to timed-out patterns in Flink CEP

Posted by Till Rohrmann <ti...@gmail.com>.
Hi David,

the problem is still that there is no corresponding watermark saying that 4
seconds have now passed. With your code, watermarks will be periodically
emitted but the same watermark will be emitted until a new element arrives
which will reset the watermark. Thus, the system can never know until this
watermark is seen whether there will be an earlier event or not. I fear
that this is a fundamental problem with stream processing.

You're right that the negation operator won't solve the problem. It will
indeed suffer from the same problem.

Cheers,
Till

On Sun, Oct 9, 2016 at 7:37 PM, <lg...@yahoo.com> wrote:

> >>FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP
> "not" operator) does not address this because again, how would the "not
> match" be triggered if no event at all occurs?
>
> Good question.
>
> I'm not sure whether the following will work:
>
> This could be done by creating a CEP matching pattern that uses both of
> "notNext" (or "notFollowedBy") and "within" constructs. Something like this:
>
> Pattern<Event, ?> pattern = Pattern.<Event>begin("first")
>     .notNext("second")
>     .within(Time.seconds(3));
>
> I'm hoping Flink CEP experts (Till?) will comment on this.
>
> Note: I have requested these negation patterns to be implemented in Flink
> CEP, but notNext/notFollowedBy are not yet implemented in Flink..
>
>
> - LF
>
>
>
>
> ------------------------------
> *From:* David Koch <og...@googlemail.com>
> *To:* user@flink.apache.org; lgfmt@yahoo.com
> *Sent:* Sunday, October 9, 2016 5:51 AM
>
> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>
> Hello,
>
> Thank you for the explanation as well as the link to the other post.
> Interesting to learn about some of the open JIRAs.
>
> Indeed, I was not using event time, but processing time. However, even
> when using event time I only get notified of timeouts upon subsequent
> events.
>
> The link <http://pastebin.com/x4m3RHQz> contains an example where I read
> <key> <value> from a socket, wrap this in a custom "event" with timestamp,
> key the resultant stream by <key> and attempt to detect <key> instances no
> further than 3 seconds apart using CEP.
>
> Apart from the fact that results are only printed when I close the socket
> (normal?) I don't observe any change in behaviour
>
> So event-time/watermarks or not: SOME event has to occur for the timeout
> to be triggered.
>
> FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP "not"
> operator) does not address this because again, how would the "not match" be
> triggered if no event at all occurs?
>
> On Sat, Oct 8, 2016 at 12:50 AM, <lg...@yahoo.com> wrote:
>
> The following is a better link:
>
> http://mail-archives.apache. org/mod_mbox/flink-user/
> 201609.mbox/%3CCAC27z% 3DOTtv7USYUm82bE43- DkoGfVC4UAWD6uQwwRgTsE5be8g%
> 40mail.gmail.com%3E
> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOTtv7USYUm82bE43-DkoGfVC4UAWD6uQwwRgTsE5be8g%40mail.gmail.com%3E>
>
>
> - LF
>
>
>
>
> ------------------------------
> *From:* "lgfmt@yahoo.com" <lg...@yahoo.com>
> *To:* "user@flink.apache.org" <us...@flink.apache.org>
> *Sent:* Friday, October 7, 2016 3:36 PM
>
> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>
> Isn't the upcoming CEP negation (absence of an event) feature solve this
> issue?
>
> See this discussion thread:
> http://mail-archives.apache. org/mod_mbox/flink-user/
> 201609.mbox/%3CCAC27z%3DOD% 2BTq8twBw_ 1YKni5sWAU3g1S9WDpJw0DUwgiG9YX
> 9Fg%40mail.gmail.com%3E
> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOD%2BTq8twBw_1YKni5sWAU3g1S9WDpJw0DUwgiG9YX9Fg%40mail.gmail.com%3E>
>
>
>
> //  Atul
>
>
> ------------------------------
> *From:* Till Rohrmann <tr...@apache.org>
> *To:* user@flink.apache.org
> *Sent:* Friday, October 7, 2016 12:58 AM
> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>
> Hi David,
>
> in case of event time, the timeout will be detected when the first
> watermark exceeding the timeout value is received. Thus, it depends a
> little bit how you generate watermarks (e.g. periodically, watermark per
> event).
>
> In case of processing time, the time is only updated whenever a new
> element arrives. Thus, if you have an element arriving 4 seconds after
> Event A, it should detect the timeout. If the next event arrives 20 seconds
> later, than you won't see the timeout until then.
>
> In the case of processing time, we could think about registering timeout
> timers for processing time. However, I would highly recommend you to use
> event time, because with processing time, Flink cannot guarantee meaningful
> computations, because the events might arrive out of order.
>
> Cheers,
> Till
>
> On Thu, Oct 6, 2016 at 3:08 PM, David Koch <og...@googlemail.com> wrote:
>
> Hello,
>
> With Flink CEP, is there a way to actively listen to pattern matches that
> time out? I am under the impression that this is not possible.
>
> In my case I partition a stream containing user web navigation by "userId"
> to look for sequences of Event A, followed by B within 4 seconds for each
> user.
>
> I registered a PatternTimeoutFunction which assuming a non-match only
> fires upon the first event after the specified timeout. For example, given
> user X: Event A, 20 seconds later Event B (or any other type of event).
>
> I'd rather have a notification fire directly upon the 4 second interval
> expiring since passive invalidation is not really applicable in my case.
>
> How, if at all can this be achieved with Flink CEP?
>
> Thanks,
>
> David
>
>
>
>
>
>
>
>
>
>

Re: Listening to timed-out patterns in Flink CEP

Posted by lg...@yahoo.com.
>>FLINK-3320 (CEP "not" operator) does not address this because again, how would the "not match" be triggered if no event at all occurs?
Good question. 
I'm not sure whether the following will work:
This could be done by creating a CEP matching pattern that uses both of "notNext" (or "notFollowedBy") and "within" constructs. Something like this:
Pattern<Event, ?> pattern = Pattern.<Event>begin("first")
    .notNext("second")
    .within(Time.seconds(3));

I'm hoping Flink CEP experts (Till?) will comment on this.
Note: I have requested these negation patterns to be implemented in Flink CEP, but notNext/notFollowedBy are not yet implemented in Flink..


- LF

   


 From: David Koch <og...@googlemail.com>
 To: user@flink.apache.org; lgfmt@yahoo.com 
 Sent: Sunday, October 9, 2016 5:51 AM
 Subject: Re: Listening to timed-out patterns in Flink CEP
  
Hello,
Thank you for the explanation as well as the link to the other post. Interesting to learn about some of the open JIRAs.
Indeed, I was not using event time, but processing time. However, even when using event time I only get notified of timeouts upon subsequent events.
The link contains an example where I read <key> <value> from a socket, wrap this in a custom "event" with timestamp, key the resultant stream by <key> and attempt to detect <key> instances no further than 3 seconds apart using CEP.
Apart from the fact that results are only printed when I close the socket (normal?) I don't observe any change in behaviour
So event-time/watermarks or not: SOME event has to occur for the timeout to be triggered.
FLINK-3320 (CEP "not" operator) does not address this because again, how would the "not match" be triggered if no event at all occurs?
On Sat, Oct 8, 2016 at 12:50 AM, <lg...@yahoo.com> wrote:

The following is a better link:

http://mail-archives.apache. org/mod_mbox/flink-user/ 201609.mbox/%3CCAC27z% 3DOTtv7USYUm82bE43- DkoGfVC4UAWD6uQwwRgTsE5be8g% 40mail.gmail.com%3E


- LF
 



      From: "lgfmt@yahoo.com" <lg...@yahoo.com>
 To: "user@flink.apache.org" <us...@flink.apache.org> 
 Sent: Friday, October 7, 2016 3:36 PM
 Subject: Re: Listening to timed-out patterns in Flink CEP
   
Isn't the upcoming CEP negation (absence of an event) feature solve this issue?
See this discussion thread:http://mail-archives.apache. org/mod_mbox/flink-user/ 201609.mbox/%3CCAC27z%3DOD% 2BTq8twBw_ 1YKni5sWAU3g1S9WDpJw0DUwgiG9YX 9Fg%40mail.gmail.com%3E

 //  Atul

      From: Till Rohrmann <tr...@apache.org>
 To: user@flink.apache.org 
 Sent: Friday, October 7, 2016 12:58 AM
 Subject: Re: Listening to timed-out patterns in Flink CEP
  
Hi David,
in case of event time, the timeout will be detected when the first watermark exceeding the timeout value is received. Thus, it depends a little bit how you generate watermarks (e.g. periodically, watermark per event).
In case of processing time, the time is only updated whenever a new element arrives. Thus, if you have an element arriving 4 seconds after Event A, it should detect the timeout. If the next event arrives 20 seconds later, than you won't see the timeout until then.
In the case of processing time, we could think about registering timeout timers for processing time. However, I would highly recommend you to use event time, because with processing time, Flink cannot guarantee meaningful computations, because the events might arrive out of order.
Cheers,Till
On Thu, Oct 6, 2016 at 3:08 PM, David Koch <og...@googlemail.com> wrote:

Hello,
With Flink CEP, is there a way to actively listen to pattern matches that time out? I am under the impression that this is not possible.

In my case I partition a stream containing user web navigation by "userId" to look for sequences of Event A, followed by B within 4 seconds for each user.
I registered a PatternTimeoutFunction which assuming a non-match only fires upon the first event after the specified timeout. For example, given user X: Event A, 20 seconds later Event B (or any other type of event).
I'd rather have a notification fire directly upon the 4 second interval expiring since passive invalidation is not really applicable in my case.
How, if at all can this be achieved with Flink CEP?
Thanks,
David




   

   



   

Re: Listening to timed-out patterns in Flink CEP

Posted by David Koch <og...@googlemail.com>.
Hello,

Thank you for the explanation as well as the link to the other post.
Interesting to learn about some of the open JIRAs.

Indeed, I was not using event time, but processing time. However, even when
using event time I only get notified of timeouts upon subsequent events.

The link <http://pastebin.com/x4m3RHQz> contains an example where I read
<key> <value> from a socket, wrap this in a custom "event" with timestamp,
key the resultant stream by <key> and attempt to detect <key> instances no
further than 3 seconds apart using CEP.

Apart from the fact that results are only printed when I close the socket
(normal?) I don't observe any change in behaviour

So event-time/watermarks or not: SOME event has to occur for the timeout to
be triggered.

FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP "not"
operator) does not address this because again, how would the "not match" be
triggered if no event at all occurs?

On Sat, Oct 8, 2016 at 12:50 AM, <lg...@yahoo.com> wrote:

> The following is a better link:
>
> http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%
> 3DOTtv7USYUm82bE43-DkoGfVC4UAWD6uQwwRgTsE5be8g%40mail.gmail.com%3E
>
>
> - LF
>
>
>
>
> ------------------------------
> *From:* "lgfmt@yahoo.com" <lg...@yahoo.com>
> *To:* "user@flink.apache.org" <us...@flink.apache.org>
> *Sent:* Friday, October 7, 2016 3:36 PM
>
> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>
> Isn't the upcoming CEP negation (absence of an event) feature solve this
> issue?
>
> See this discussion thread:
> http://mail-archives.apache.org/mod_mbox/flink-user/
> 201609.mbox/%3CCAC27z%3DOD%2BTq8twBw_1YKni5sWAU3g1S9WDpJw0DUwgiG9YX
> 9Fg%40mail.gmail.com%3E
>
>
>
> //  Atul
>
>
> ------------------------------
> *From:* Till Rohrmann <tr...@apache.org>
> *To:* user@flink.apache.org
> *Sent:* Friday, October 7, 2016 12:58 AM
> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>
> Hi David,
>
> in case of event time, the timeout will be detected when the first
> watermark exceeding the timeout value is received. Thus, it depends a
> little bit how you generate watermarks (e.g. periodically, watermark per
> event).
>
> In case of processing time, the time is only updated whenever a new
> element arrives. Thus, if you have an element arriving 4 seconds after
> Event A, it should detect the timeout. If the next event arrives 20 seconds
> later, than you won't see the timeout until then.
>
> In the case of processing time, we could think about registering timeout
> timers for processing time. However, I would highly recommend you to use
> event time, because with processing time, Flink cannot guarantee meaningful
> computations, because the events might arrive out of order.
>
> Cheers,
> Till
>
> On Thu, Oct 6, 2016 at 3:08 PM, David Koch <og...@googlemail.com> wrote:
>
> Hello,
>
> With Flink CEP, is there a way to actively listen to pattern matches that
> time out? I am under the impression that this is not possible.
>
> In my case I partition a stream containing user web navigation by "userId"
> to look for sequences of Event A, followed by B within 4 seconds for each
> user.
>
> I registered a PatternTimeoutFunction which assuming a non-match only
> fires upon the first event after the specified timeout. For example, given
> user X: Event A, 20 seconds later Event B (or any other type of event).
>
> I'd rather have a notification fire directly upon the 4 second interval
> expiring since passive invalidation is not really applicable in my case.
>
> How, if at all can this be achieved with Flink CEP?
>
> Thanks,
>
> David
>
>
>
>
>
>
>

Re: Listening to timed-out patterns in Flink CEP

Posted by lg...@yahoo.com.
The following is a better link:

http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOTtv7USYUm82bE43-DkoGfVC4UAWD6uQwwRgTsE5be8g%40mail.gmail.com%3E


- LF
 



      From: "lgfmt@yahoo.com" <lg...@yahoo.com>
 To: "user@flink.apache.org" <us...@flink.apache.org> 
 Sent: Friday, October 7, 2016 3:36 PM
 Subject: Re: Listening to timed-out patterns in Flink CEP
   
Isn't the upcoming CEP negation (absence of an event) feature solve this issue?
See this discussion thread:http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOD%2BTq8twBw_1YKni5sWAU3g1S9WDpJw0DUwgiG9YX9Fg%40mail.gmail.com%3E

 //  Atul

      From: Till Rohrmann <tr...@apache.org>
 To: user@flink.apache.org 
 Sent: Friday, October 7, 2016 12:58 AM
 Subject: Re: Listening to timed-out patterns in Flink CEP
  
Hi David,
in case of event time, the timeout will be detected when the first watermark exceeding the timeout value is received. Thus, it depends a little bit how you generate watermarks (e.g. periodically, watermark per event).
In case of processing time, the time is only updated whenever a new element arrives. Thus, if you have an element arriving 4 seconds after Event A, it should detect the timeout. If the next event arrives 20 seconds later, than you won't see the timeout until then.
In the case of processing time, we could think about registering timeout timers for processing time. However, I would highly recommend you to use event time, because with processing time, Flink cannot guarantee meaningful computations, because the events might arrive out of order.
Cheers,Till
On Thu, Oct 6, 2016 at 3:08 PM, David Koch <og...@googlemail.com> wrote:

Hello,
With Flink CEP, is there a way to actively listen to pattern matches that time out? I am under the impression that this is not possible.

In my case I partition a stream containing user web navigation by "userId" to look for sequences of Event A, followed by B within 4 seconds for each user.
I registered a PatternTimeoutFunction which assuming a non-match only fires upon the first event after the specified timeout. For example, given user X: Event A, 20 seconds later Event B (or any other type of event).
I'd rather have a notification fire directly upon the 4 second interval expiring since passive invalidation is not really applicable in my case.
How, if at all can this be achieved with Flink CEP?
Thanks,
David




   

   

Re: Listening to timed-out patterns in Flink CEP

Posted by lg...@yahoo.com.
Isn't the upcoming CEP negation (absence of an event) feature solve this issue?
See this discussion thread:http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOD%2BTq8twBw_1YKni5sWAU3g1S9WDpJw0DUwgiG9YX9Fg%40mail.gmail.com%3E

 //  Atul

      From: Till Rohrmann <tr...@apache.org>
 To: user@flink.apache.org 
 Sent: Friday, October 7, 2016 12:58 AM
 Subject: Re: Listening to timed-out patterns in Flink CEP
   
Hi David,
in case of event time, the timeout will be detected when the first watermark exceeding the timeout value is received. Thus, it depends a little bit how you generate watermarks (e.g. periodically, watermark per event).
In case of processing time, the time is only updated whenever a new element arrives. Thus, if you have an element arriving 4 seconds after Event A, it should detect the timeout. If the next event arrives 20 seconds later, than you won't see the timeout until then.
In the case of processing time, we could think about registering timeout timers for processing time. However, I would highly recommend you to use event time, because with processing time, Flink cannot guarantee meaningful computations, because the events might arrive out of order.
Cheers,Till
On Thu, Oct 6, 2016 at 3:08 PM, David Koch <og...@googlemail.com> wrote:

Hello,
With Flink CEP, is there a way to actively listen to pattern matches that time out? I am under the impression that this is not possible.

In my case I partition a stream containing user web navigation by "userId" to look for sequences of Event A, followed by B within 4 seconds for each user.
I registered a PatternTimeoutFunction which assuming a non-match only fires upon the first event after the specified timeout. For example, given user X: Event A, 20 seconds later Event B (or any other type of event).
I'd rather have a notification fire directly upon the 4 second interval expiring since passive invalidation is not really applicable in my case.
How, if at all can this be achieved with Flink CEP?
Thanks,
David




   

Re: Listening to timed-out patterns in Flink CEP

Posted by Till Rohrmann <tr...@apache.org>.
Hi David,

in case of event time, the timeout will be detected when the first
watermark exceeding the timeout value is received. Thus, it depends a
little bit how you generate watermarks (e.g. periodically, watermark per
event).

In case of processing time, the time is only updated whenever a new element
arrives. Thus, if you have an element arriving 4 seconds after Event A, it
should detect the timeout. If the next event arrives 20 seconds later, than
you won't see the timeout until then.

In the case of processing time, we could think about registering timeout
timers for processing time. However, I would highly recommend you to use
event time, because with processing time, Flink cannot guarantee meaningful
computations, because the events might arrive out of order.

Cheers,
Till

On Thu, Oct 6, 2016 at 3:08 PM, David Koch <og...@googlemail.com> wrote:

> Hello,
>
> With Flink CEP, is there a way to actively listen to pattern matches that
> time out? I am under the impression that this is not possible.
>
> In my case I partition a stream containing user web navigation by "userId"
> to look for sequences of Event A, followed by B within 4 seconds for each
> user.
>
> I registered a PatternTimeoutFunction which assuming a non-match only
> fires upon the first event after the specified timeout. For example, given
> user X: Event A, 20 seconds later Event B (or any other type of event).
>
> I'd rather have a notification fire directly upon the 4 second interval
> expiring since passive invalidation is not really applicable in my case.
>
> How, if at all can this be achieved with Flink CEP?
>
> Thanks,
>
> David
>
>