You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Manu Zhang <ow...@gmail.com> on 2016/10/24 04:30:24 UTC

watermark trigger doesn't check whether element's timestamp is passed

Hi,

Say I have a window state of List(("a", 1:00), ("b", 1:03), ("c", 1:06))
which is triggered to emit when watermark passes the timestamp of an
element. For example,

on watermark(1:01), List(("a", 1:00)) is emitted
on watermark(1:04), List(("a", 1:00), ("b", 1:03)) is emitted
on watermark(1:07), List(("a", 1:00), ("b", 1:03), ("c", 1:06)) is emitted

It seems that if *("c", 1:06) is processed before watermark(1:04)*
List(("a", 1:00), ("b", 1:03), ("c", 1:06)) will be emitted on
watermark(1:04). This is incorrect since there could be elements with
timestamp between 1:04 and 1:06 that have not arrived yet.

I guess this is because watermark trigger doesn't check whether element's
timestamp has been passed.

Please correct me if any of the above is not right.

Thanks,
Manu Zhang

Re: watermark trigger doesn't check whether element's timestamp is passed

Posted by Manu Zhang <ow...@gmail.com>.
Thanks, that will be great. I'd like to test against my particular use
cases once your PR is available.

Manu

On Wed, Nov 2, 2016 at 11:09 PM Ventura Del Monte <ve...@gmail.com>
wrote:

> Hello,
>
> I have just opened the JIRA issue
> <https://issues.apache.org/jira/browse/FLINK-4997> and I have almost
> completed the implementation of this feature. I will keep you posted :)
>
> Cheers,
> Ventura
>
>
>
> This message, for the D. Lgs n. 196/2003 (Privacy Code), may contain
> confidential and/or privileged information. If you are not the addressee or
> authorized to receive this for the addressee, you must not use, copy,
> disclose or take any action based on this message or any information
> herein. If you have received this message in error, please advise the
> sender immediately by reply e-mail and delete this message. Thank you for
> your cooperation.
>
> On Wed, Nov 2, 2016 at 2:18 PM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
> Hi,
> a contributor (Bonaventure Del Monte) has started working on this. He
> should open a Jira this week.
>
> Cheer,
> Aljoscha
>
> On Tue, 1 Nov 2016 at 23:57 aj heller <dr...@gmail.com> wrote:
>
> Hi Manu, Aljoscha,
>
> I had been interested in implementing FLIP-2, but I haven't been able to
> make time for it. There is no implementation yet that I'm aware of, and
> I'll gladly step aside (or help out how I can) if you or anyone is
> interested to take charge of it.
>
> That said, I'm also not sure if discussions are ongoing. I had hoped to
> prototype the proposal as is, to have something more concrete to discuss.
>
> Cheers,
> aj
> On Nov 1, 2016 3:24 PM, "Manu Zhang" <ow...@gmail.com> wrote:
>
> Thanks.  The ideal case is to fire after watermark past each element from
> the window but that requires a custom trigger and FLIP-2 as well. The
> enhanced window evictor will help to avoid the last firing.
>
> Are the discussions on FLIP-2 still going on ?
> Are there any opening JIRAs or PRs ? (The proposed `ProcessWindowFunction`
> will be sufficient for my case)
> Is there a workaround now for my case ?
>
> Thanks again for following through this.
> Manu
>
> On Wed, Nov 2, 2016 at 1:07 AM Aljoscha Krettek <al...@apache.org>
> wrote:
>
> Ah, I finally understand it. You would a way to query the current
> watermark in the window function to only emit those elements where the
> timestamp is lower than the watermark.
>
> When the window fires again, do you want to emit elements that you emitted
> during the last firing again? If not, I think you also need to use an
> evictor to evict the elements from the window where the timestamp is lower
> than the watermark. With this FLIP
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata we
> should be able to extend the WindowFunction Context to also provide the
> current watermark. With this recent PR
> https://github.com/apache/flink/pull/2736 you would be able to evict
> elements from the window state after the window function was called.
>
> Cheers,
> Aljoscha
>
> On Tue, 1 Nov 2016 at 02:27 Manu Zhang <ow...@gmail.com> wrote:
>
> Yes, here's the example
> https://github.com/manuzhang/flink/blob/pv/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/session/PageViewSessionWindowing.scala
>
> If you print and compare the timestamp of timer with that of "PageView" in
> the outputs, you could see what I mean.
>
> I think the recently introduced TimelyFlatMapFunction is close to what I
> want to achieve. It will be great if we can query time information in the
> window function so I filed
> https://issues.apache.org/jira/browse/FLINK-4953
>
> Thanks for your time.
>
> Manu
>
> On Mon, Oct 31, 2016 at 10:29 PM Aljoscha Krettek <al...@apache.org>
> wrote:
>
> Hmm, I don't completely understand what's going on. Could you maybe post
> an example, with the trigger code that shows this behaviour?
>
> Cheers,
> Aljoscha
>
> On Thu, 27 Oct 2016 at 17:12 Manu Zhang <ow...@gmail.com> wrote:
>
> Hi,
>
> It's what I'm seeing. If timers are not fired at the end of window, a
> state (in the window) whose timestamp is *after *the timer will also be
> emitted. That's a problem for event-time trigger.
>
> Thanks,
> Manu
>
>
> On Thu, Oct 27, 2016 at 10:29 PM Aljoscha Krettek <al...@apache.org>
> wrote:
>
> Hi,
> is that example input/output what you would like to achieve or what you
> are currently seeing with Flink? I think for your use case a custom Trigger
> would be required that works like the event-time trigger but additionally
> registers timers for each element where you want to emit.
>
> Cheers,
> Aljoscha
>
> On Wed, 26 Oct 2016 at 04:04 Manu Zhang <ow...@gmail.com> wrote:
>
> Hi Aljoscha,
>
> Thanks for your response.  My use case is to track user trajectory based
> on page view event when they visit a website.  The input would be like a
> list of PageView(userId, url, eventTimestamp) with watermarks (=
> eventTimestamp - duration). I'm trying SessionWindows with some event time
> trigger. Note we can't wait for the end of session window due to latency.
> Instead, we want to emit the user trajectories whenever a buffered
> PageView's event time is passed by watermark. I tried
> ContinuousEventTimeTrigger and a custom trigger which sets timer on each
> element's timestamp. For both triggers I've witnessed a problem like the
> following (e.g. a session gap of 5)
>
> PageView("user1", "http://foo", 1)
> PageView("user1", "http://foo/bar", 2)
> Watermark(1) => emit UserTrajectory("user1", "http://foo -> *http://foo/bar
> <http://foo/bar>*", [1,6])
> PageView("user1", "http://foo/bar/foobar", 5)
> Watermark(4) => emit UserTrajectory("user1", "http://foo -> http://foo/bar ->
> *http://foo/bar/foobar <http://foo/bar/foobar>*", [1, 10])
>
> The urls in bold should be included since there could be events before
> them not arrived yet.
>
>
> Thanks,
> Manu
>
>
> On Tue, Oct 25, 2016 at 1:36 AM Aljoscha Krettek <al...@apache.org>
> wrote:
>
> Hi,
> with some additional information we might be able to figure this out
> together. What specific combination of WindowAssigner/Trigger are you using
> for your example and what is the input stream (including watermarks)?
>
> Cheers,
> Aljoscha
>
> On Mon, 24 Oct 2016 at 06:30 Manu Zhang <ow...@gmail.com> wrote:
>
> Hi,
>
> Say I have a window state of List(("a", 1:00), ("b", 1:03), ("c", 1:06))
> which is triggered to emit when watermark passes the timestamp of an
> element. For example,
>
> on watermark(1:01), List(("a", 1:00)) is emitted
> on watermark(1:04), List(("a", 1:00), ("b", 1:03)) is emitted
> on watermark(1:07), List(("a", 1:00), ("b", 1:03), ("c", 1:06)) is emitted
>
> It seems that if *("c", 1:06) is processed before watermark(1:04)*
> List(("a", 1:00), ("b", 1:03), ("c", 1:06)) will be emitted on
> watermark(1:04). This is incorrect since there could be elements with
> timestamp between 1:04 and 1:06 that have not arrived yet.
>
> I guess this is because watermark trigger doesn't check whether element's
> timestamp has been passed.
>
> Please correct me if any of the above is not right.
>
> Thanks,
> Manu Zhang
>
>
>
>
>

Re: watermark trigger doesn't check whether element's timestamp is passed

Posted by Ventura Del Monte <ve...@gmail.com>.
Hello,

I have just opened the JIRA issue
<https://issues.apache.org/jira/browse/FLINK-4997> and I have almost
completed the implementation of this feature. I will keep you posted :)

Cheers,
Ventura



This message, for the D. Lgs n. 196/2003 (Privacy Code), may contain
confidential and/or privileged information. If you are not the addressee or
authorized to receive this for the addressee, you must not use, copy,
disclose or take any action based on this message or any information
herein. If you have received this message in error, please advise the
sender immediately by reply e-mail and delete this message. Thank you for
your cooperation.

On Wed, Nov 2, 2016 at 2:18 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> a contributor (Bonaventure Del Monte) has started working on this. He
> should open a Jira this week.
>
> Cheer,
> Aljoscha
>
> On Tue, 1 Nov 2016 at 23:57 aj heller <dr...@gmail.com> wrote:
>
> Hi Manu, Aljoscha,
>
> I had been interested in implementing FLIP-2, but I haven't been able to
> make time for it. There is no implementation yet that I'm aware of, and
> I'll gladly step aside (or help out how I can) if you or anyone is
> interested to take charge of it.
>
> That said, I'm also not sure if discussions are ongoing. I had hoped to
> prototype the proposal as is, to have something more concrete to discuss.
>
> Cheers,
> aj
> On Nov 1, 2016 3:24 PM, "Manu Zhang" <ow...@gmail.com> wrote:
>
> Thanks.  The ideal case is to fire after watermark past each element from
> the window but that requires a custom trigger and FLIP-2 as well. The
> enhanced window evictor will help to avoid the last firing.
>
> Are the discussions on FLIP-2 still going on ?
> Are there any opening JIRAs or PRs ? (The proposed `ProcessWindowFunction`
> will be sufficient for my case)
> Is there a workaround now for my case ?
>
> Thanks again for following through this.
> Manu
>
> On Wed, Nov 2, 2016 at 1:07 AM Aljoscha Krettek <al...@apache.org>
> wrote:
>
> Ah, I finally understand it. You would a way to query the current
> watermark in the window function to only emit those elements where the
> timestamp is lower than the watermark.
>
> When the window fires again, do you want to emit elements that you emitted
> during the last firing again? If not, I think you also need to use an
> evictor to evict the elements from the window where the timestamp is lower
> than the watermark. With this FLIP https://cwiki.apache.org/
> confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata we
> should be able to extend the WindowFunction Context to also provide the
> current watermark. With this recent PR https://github.com/apache/
> flink/pull/2736 you would be able to evict elements from the window state
> after the window function was called.
>
> Cheers,
> Aljoscha
>
> On Tue, 1 Nov 2016 at 02:27 Manu Zhang <ow...@gmail.com> wrote:
>
> Yes, here's the example https://github.com/manuzhang/flink/blob/pv/flink-
> examples/flink-examples-streaming/src/main/scala/org/
> apache/flink/streaming/scala/examples/session/
> PageViewSessionWindowing.scala
>
> If you print and compare the timestamp of timer with that of "PageView" in
> the outputs, you could see what I mean.
>
> I think the recently introduced TimelyFlatMapFunction is close to what I
> want to achieve. It will be great if we can query time information in the
> window function so I filed https://issues.apache.
> org/jira/browse/FLINK-4953
>
> Thanks for your time.
>
> Manu
>
> On Mon, Oct 31, 2016 at 10:29 PM Aljoscha Krettek <al...@apache.org>
> wrote:
>
> Hmm, I don't completely understand what's going on. Could you maybe post
> an example, with the trigger code that shows this behaviour?
>
> Cheers,
> Aljoscha
>
> On Thu, 27 Oct 2016 at 17:12 Manu Zhang <ow...@gmail.com> wrote:
>
> Hi,
>
> It's what I'm seeing. If timers are not fired at the end of window, a
> state (in the window) whose timestamp is *after *the timer will also be
> emitted. That's a problem for event-time trigger.
>
> Thanks,
> Manu
>
>
> On Thu, Oct 27, 2016 at 10:29 PM Aljoscha Krettek <al...@apache.org>
> wrote:
>
> Hi,
> is that example input/output what you would like to achieve or what you
> are currently seeing with Flink? I think for your use case a custom Trigger
> would be required that works like the event-time trigger but additionally
> registers timers for each element where you want to emit.
>
> Cheers,
> Aljoscha
>
> On Wed, 26 Oct 2016 at 04:04 Manu Zhang <ow...@gmail.com> wrote:
>
> Hi Aljoscha,
>
> Thanks for your response.  My use case is to track user trajectory based
> on page view event when they visit a website.  The input would be like a
> list of PageView(userId, url, eventTimestamp) with watermarks (=
> eventTimestamp - duration). I'm trying SessionWindows with some event time
> trigger. Note we can't wait for the end of session window due to latency.
> Instead, we want to emit the user trajectories whenever a buffered
> PageView's event time is passed by watermark. I tried
> ContinuousEventTimeTrigger and a custom trigger which sets timer on each
> element's timestamp. For both triggers I've witnessed a problem like the
> following (e.g. a session gap of 5)
>
> PageView("user1", "http://foo", 1)
> PageView("user1", "http://foo/bar", 2)
> Watermark(1) => emit UserTrajectory("user1", "http://foo -> *http://foo/bar
> <http://foo/bar>*", [1,6])
> PageView("user1", "http://foo/bar/foobar", 5)
> Watermark(4) => emit UserTrajectory("user1", "http://foo -> http://foo/bar ->
> *http://foo/bar/foobar <http://foo/bar/foobar>*", [1, 10])
>
> The urls in bold should be included since there could be events before
> them not arrived yet.
>
>
> Thanks,
> Manu
>
>
> On Tue, Oct 25, 2016 at 1:36 AM Aljoscha Krettek <al...@apache.org>
> wrote:
>
> Hi,
> with some additional information we might be able to figure this out
> together. What specific combination of WindowAssigner/Trigger are you using
> for your example and what is the input stream (including watermarks)?
>
> Cheers,
> Aljoscha
>
> On Mon, 24 Oct 2016 at 06:30 Manu Zhang <ow...@gmail.com> wrote:
>
> Hi,
>
> Say I have a window state of List(("a", 1:00), ("b", 1:03), ("c", 1:06))
> which is triggered to emit when watermark passes the timestamp of an
> element. For example,
>
> on watermark(1:01), List(("a", 1:00)) is emitted
> on watermark(1:04), List(("a", 1:00), ("b", 1:03)) is emitted
> on watermark(1:07), List(("a", 1:00), ("b", 1:03), ("c", 1:06)) is emitted
>
> It seems that if *("c", 1:06) is processed before watermark(1:04)*
> List(("a", 1:00), ("b", 1:03), ("c", 1:06)) will be emitted on
> watermark(1:04). This is incorrect since there could be elements with
> timestamp between 1:04 and 1:06 that have not arrived yet.
>
> I guess this is because watermark trigger doesn't check whether element's
> timestamp has been passed.
>
> Please correct me if any of the above is not right.
>
> Thanks,
> Manu Zhang
>
>
>
>

Re: watermark trigger doesn't check whether element's timestamp is passed

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
a contributor (Bonaventure Del Monte) has started working on this. He
should open a Jira this week.

Cheer,
Aljoscha

On Tue, 1 Nov 2016 at 23:57 aj heller <dr...@gmail.com> wrote:

Hi Manu, Aljoscha,

I had been interested in implementing FLIP-2, but I haven't been able to
make time for it. There is no implementation yet that I'm aware of, and
I'll gladly step aside (or help out how I can) if you or anyone is
interested to take charge of it.

That said, I'm also not sure if discussions are ongoing. I had hoped to
prototype the proposal as is, to have something more concrete to discuss.

Cheers,
aj
On Nov 1, 2016 3:24 PM, "Manu Zhang" <ow...@gmail.com> wrote:

Thanks.  The ideal case is to fire after watermark past each element from
the window but that requires a custom trigger and FLIP-2 as well. The
enhanced window evictor will help to avoid the last firing.

Are the discussions on FLIP-2 still going on ?
Are there any opening JIRAs or PRs ? (The proposed `ProcessWindowFunction`
will be sufficient for my case)
Is there a workaround now for my case ?

Thanks again for following through this.
Manu

On Wed, Nov 2, 2016 at 1:07 AM Aljoscha Krettek <al...@apache.org> wrote:

Ah, I finally understand it. You would a way to query the current watermark
in the window function to only emit those elements where the timestamp is
lower than the watermark.

When the window fires again, do you want to emit elements that you emitted
during the last firing again? If not, I think you also need to use an
evictor to evict the elements from the window where the timestamp is lower
than the watermark. With this FLIP
https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata
we
should be able to extend the WindowFunction Context to also provide the
current watermark. With this recent PR
https://github.com/apache/flink/pull/2736 you would be able to evict
elements from the window state after the window function was called.

Cheers,
Aljoscha

On Tue, 1 Nov 2016 at 02:27 Manu Zhang <ow...@gmail.com> wrote:

Yes, here's the example
https://github.com/manuzhang/flink/blob/pv/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/session/PageViewSessionWindowing.scala

If you print and compare the timestamp of timer with that of "PageView" in
the outputs, you could see what I mean.

I think the recently introduced TimelyFlatMapFunction is close to what I
want to achieve. It will be great if we can query time information in the
window function so I filed https://issues.apache.org/jira/browse/FLINK-4953

Thanks for your time.

Manu

On Mon, Oct 31, 2016 at 10:29 PM Aljoscha Krettek <al...@apache.org>
wrote:

Hmm, I don't completely understand what's going on. Could you maybe post an
example, with the trigger code that shows this behaviour?

Cheers,
Aljoscha

On Thu, 27 Oct 2016 at 17:12 Manu Zhang <ow...@gmail.com> wrote:

Hi,

It's what I'm seeing. If timers are not fired at the end of window, a state
(in the window) whose timestamp is *after *the timer will also be emitted.
That's a problem for event-time trigger.

Thanks,
Manu


On Thu, Oct 27, 2016 at 10:29 PM Aljoscha Krettek <al...@apache.org>
wrote:

Hi,
is that example input/output what you would like to achieve or what you are
currently seeing with Flink? I think for your use case a custom Trigger
would be required that works like the event-time trigger but additionally
registers timers for each element where you want to emit.

Cheers,
Aljoscha

On Wed, 26 Oct 2016 at 04:04 Manu Zhang <ow...@gmail.com> wrote:

Hi Aljoscha,

Thanks for your response.  My use case is to track user trajectory based on
page view event when they visit a website.  The input would be like a list
of PageView(userId, url, eventTimestamp) with watermarks (= eventTimestamp
- duration). I'm trying SessionWindows with some event time trigger. Note
we can't wait for the end of session window due to latency. Instead, we
want to emit the user trajectories whenever a buffered PageView's event
time is passed by watermark. I tried ContinuousEventTimeTrigger and a
custom trigger which sets timer on each element's timestamp. For both
triggers I've witnessed a problem like the following (e.g. a session gap of
5)

PageView("user1", "http://foo", 1)
PageView("user1", "http://foo/bar", 2)
Watermark(1) => emit UserTrajectory("user1", "http://foo -> *http://foo/bar
<http://foo/bar>*", [1,6])
PageView("user1", "http://foo/bar/foobar", 5)
Watermark(4) => emit UserTrajectory("user1", "http://foo -> http://foo/bar ->
*http://foo/bar/foobar <http://foo/bar/foobar>*", [1, 10])

The urls in bold should be included since there could be events before them
not arrived yet.


Thanks,
Manu


On Tue, Oct 25, 2016 at 1:36 AM Aljoscha Krettek <al...@apache.org>
wrote:

Hi,
with some additional information we might be able to figure this out
together. What specific combination of WindowAssigner/Trigger are you using
for your example and what is the input stream (including watermarks)?

Cheers,
Aljoscha

On Mon, 24 Oct 2016 at 06:30 Manu Zhang <ow...@gmail.com> wrote:

Hi,

Say I have a window state of List(("a", 1:00), ("b", 1:03), ("c", 1:06))
which is triggered to emit when watermark passes the timestamp of an
element. For example,

on watermark(1:01), List(("a", 1:00)) is emitted
on watermark(1:04), List(("a", 1:00), ("b", 1:03)) is emitted
on watermark(1:07), List(("a", 1:00), ("b", 1:03), ("c", 1:06)) is emitted

It seems that if *("c", 1:06) is processed before watermark(1:04)*
List(("a", 1:00), ("b", 1:03), ("c", 1:06)) will be emitted on
watermark(1:04). This is incorrect since there could be elements with
timestamp between 1:04 and 1:06 that have not arrived yet.

I guess this is because watermark trigger doesn't check whether element's
timestamp has been passed.

Please correct me if any of the above is not right.

Thanks,
Manu Zhang

Re: watermark trigger doesn't check whether element's timestamp is passed

Posted by aj heller <dr...@gmail.com>.
Hi Manu, Aljoscha,

I had been interested in implementing FLIP-2, but I haven't been able to
make time for it. There is no implementation yet that I'm aware of, and
I'll gladly step aside (or help out how I can) if you or anyone is
interested to take charge of it.

That said, I'm also not sure if discussions are ongoing. I had hoped to
prototype the proposal as is, to have something more concrete to discuss.

Cheers,
aj
On Nov 1, 2016 3:24 PM, "Manu Zhang" <ow...@gmail.com> wrote:

> Thanks.  The ideal case is to fire after watermark past each element from
> the window but that requires a custom trigger and FLIP-2 as well. The
> enhanced window evictor will help to avoid the last firing.
>
> Are the discussions on FLIP-2 still going on ?
> Are there any opening JIRAs or PRs ? (The proposed `ProcessWindowFunction`
> will be sufficient for my case)
> Is there a workaround now for my case ?
>
> Thanks again for following through this.
> Manu
>
> On Wed, Nov 2, 2016 at 1:07 AM Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Ah, I finally understand it. You would a way to query the current
>> watermark in the window function to only emit those elements where the
>> timestamp is lower than the watermark.
>>
>> When the window fires again, do you want to emit elements that you
>> emitted during the last firing again? If not, I think you also need to use
>> an evictor to evict the elements from the window where the timestamp is
>> lower than the watermark. With this FLIP https://cwiki.apache.org/
>> confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata we
>> should be able to extend the WindowFunction Context to also provide the
>> current watermark. With this recent PR https://github.com/apache/
>> flink/pull/2736 you would be able to evict elements from the window
>> state after the window function was called.
>>
>> Cheers,
>> Aljoscha
>>
>> On Tue, 1 Nov 2016 at 02:27 Manu Zhang <ow...@gmail.com> wrote:
>>
>> Yes, here's the example https://github.com/manuzhang/flink/blob/pv/flink-
>> examples/flink-examples-streaming/src/main/scala/org/
>> apache/flink/streaming/scala/examples/session/
>> PageViewSessionWindowing.scala
>>
>> If you print and compare the timestamp of timer with that of "PageView"
>> in the outputs, you could see what I mean.
>>
>> I think the recently introduced TimelyFlatMapFunction is close to what I
>> want to achieve. It will be great if we can query time information in the
>> window function so I filed https://issues.apache.
>> org/jira/browse/FLINK-4953
>>
>> Thanks for your time.
>>
>> Manu
>>
>> On Mon, Oct 31, 2016 at 10:29 PM Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>> Hmm, I don't completely understand what's going on. Could you maybe post
>> an example, with the trigger code that shows this behaviour?
>>
>> Cheers,
>> Aljoscha
>>
>> On Thu, 27 Oct 2016 at 17:12 Manu Zhang <ow...@gmail.com> wrote:
>>
>> Hi,
>>
>> It's what I'm seeing. If timers are not fired at the end of window, a
>> state (in the window) whose timestamp is *after *the timer will also be
>> emitted. That's a problem for event-time trigger.
>>
>> Thanks,
>> Manu
>>
>>
>> On Thu, Oct 27, 2016 at 10:29 PM Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>> Hi,
>> is that example input/output what you would like to achieve or what you
>> are currently seeing with Flink? I think for your use case a custom Trigger
>> would be required that works like the event-time trigger but additionally
>> registers timers for each element where you want to emit.
>>
>> Cheers,
>> Aljoscha
>>
>> On Wed, 26 Oct 2016 at 04:04 Manu Zhang <ow...@gmail.com> wrote:
>>
>> Hi Aljoscha,
>>
>> Thanks for your response.  My use case is to track user trajectory based
>> on page view event when they visit a website.  The input would be like a
>> list of PageView(userId, url, eventTimestamp) with watermarks (=
>> eventTimestamp - duration). I'm trying SessionWindows with some event time
>> trigger. Note we can't wait for the end of session window due to latency.
>> Instead, we want to emit the user trajectories whenever a buffered
>> PageView's event time is passed by watermark. I tried
>> ContinuousEventTimeTrigger and a custom trigger which sets timer on each
>> element's timestamp. For both triggers I've witnessed a problem like the
>> following (e.g. a session gap of 5)
>>
>> PageView("user1", "http://foo", 1)
>> PageView("user1", "http://foo/bar", 2)
>> Watermark(1) => emit UserTrajectory("user1", "http://foo -> *http://foo/bar
>> <http://foo/bar>*", [1,6])
>> PageView("user1", "http://foo/bar/foobar", 5)
>> Watermark(4) => emit UserTrajectory("user1", "http://foo ->
>> http://foo/bar -> *http://foo/bar/foobar <http://foo/bar/foobar>*", [1,
>> 10])
>>
>> The urls in bold should be included since there could be events before
>> them not arrived yet.
>>
>>
>> Thanks,
>> Manu
>>
>>
>> On Tue, Oct 25, 2016 at 1:36 AM Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>> Hi,
>> with some additional information we might be able to figure this out
>> together. What specific combination of WindowAssigner/Trigger are you using
>> for your example and what is the input stream (including watermarks)?
>>
>> Cheers,
>> Aljoscha
>>
>> On Mon, 24 Oct 2016 at 06:30 Manu Zhang <ow...@gmail.com> wrote:
>>
>> Hi,
>>
>> Say I have a window state of List(("a", 1:00), ("b", 1:03), ("c", 1:06))
>> which is triggered to emit when watermark passes the timestamp of an
>> element. For example,
>>
>> on watermark(1:01), List(("a", 1:00)) is emitted
>> on watermark(1:04), List(("a", 1:00), ("b", 1:03)) is emitted
>> on watermark(1:07), List(("a", 1:00), ("b", 1:03), ("c", 1:06)) is emitted
>>
>> It seems that if *("c", 1:06) is processed before watermark(1:04)*
>> List(("a", 1:00), ("b", 1:03), ("c", 1:06)) will be emitted on
>> watermark(1:04). This is incorrect since there could be elements with
>> timestamp between 1:04 and 1:06 that have not arrived yet.
>>
>> I guess this is because watermark trigger doesn't check whether element's
>> timestamp has been passed.
>>
>> Please correct me if any of the above is not right.
>>
>> Thanks,
>> Manu Zhang
>>
>>
>>
>>

Re: watermark trigger doesn't check whether element's timestamp is passed

Posted by Manu Zhang <ow...@gmail.com>.
Thanks.  The ideal case is to fire after watermark past each element from
the window but that requires a custom trigger and FLIP-2 as well. The
enhanced window evictor will help to avoid the last firing.

Are the discussions on FLIP-2 still going on ?
Are there any opening JIRAs or PRs ? (The proposed `ProcessWindowFunction`
will be sufficient for my case)
Is there a workaround now for my case ?

Thanks again for following through this.
Manu

On Wed, Nov 2, 2016 at 1:07 AM Aljoscha Krettek <al...@apache.org> wrote:

> Ah, I finally understand it. You would a way to query the current
> watermark in the window function to only emit those elements where the
> timestamp is lower than the watermark.
>
> When the window fires again, do you want to emit elements that you emitted
> during the last firing again? If not, I think you also need to use an
> evictor to evict the elements from the window where the timestamp is lower
> than the watermark. With this FLIP
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata we
> should be able to extend the WindowFunction Context to also provide the
> current watermark. With this recent PR
> https://github.com/apache/flink/pull/2736 you would be able to evict
> elements from the window state after the window function was called.
>
> Cheers,
> Aljoscha
>
> On Tue, 1 Nov 2016 at 02:27 Manu Zhang <ow...@gmail.com> wrote:
>
> Yes, here's the example
> https://github.com/manuzhang/flink/blob/pv/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/session/PageViewSessionWindowing.scala
>
> If you print and compare the timestamp of timer with that of "PageView" in
> the outputs, you could see what I mean.
>
> I think the recently introduced TimelyFlatMapFunction is close to what I
> want to achieve. It will be great if we can query time information in the
> window function so I filed
> https://issues.apache.org/jira/browse/FLINK-4953
>
> Thanks for your time.
>
> Manu
>
> On Mon, Oct 31, 2016 at 10:29 PM Aljoscha Krettek <al...@apache.org>
> wrote:
>
> Hmm, I don't completely understand what's going on. Could you maybe post
> an example, with the trigger code that shows this behaviour?
>
> Cheers,
> Aljoscha
>
> On Thu, 27 Oct 2016 at 17:12 Manu Zhang <ow...@gmail.com> wrote:
>
> Hi,
>
> It's what I'm seeing. If timers are not fired at the end of window, a
> state (in the window) whose timestamp is *after *the timer will also be
> emitted. That's a problem for event-time trigger.
>
> Thanks,
> Manu
>
>
> On Thu, Oct 27, 2016 at 10:29 PM Aljoscha Krettek <al...@apache.org>
> wrote:
>
> Hi,
> is that example input/output what you would like to achieve or what you
> are currently seeing with Flink? I think for your use case a custom Trigger
> would be required that works like the event-time trigger but additionally
> registers timers for each element where you want to emit.
>
> Cheers,
> Aljoscha
>
> On Wed, 26 Oct 2016 at 04:04 Manu Zhang <ow...@gmail.com> wrote:
>
> Hi Aljoscha,
>
> Thanks for your response.  My use case is to track user trajectory based
> on page view event when they visit a website.  The input would be like a
> list of PageView(userId, url, eventTimestamp) with watermarks (=
> eventTimestamp - duration). I'm trying SessionWindows with some event time
> trigger. Note we can't wait for the end of session window due to latency.
> Instead, we want to emit the user trajectories whenever a buffered
> PageView's event time is passed by watermark. I tried
> ContinuousEventTimeTrigger and a custom trigger which sets timer on each
> element's timestamp. For both triggers I've witnessed a problem like the
> following (e.g. a session gap of 5)
>
> PageView("user1", "http://foo", 1)
> PageView("user1", "http://foo/bar", 2)
> Watermark(1) => emit UserTrajectory("user1", "http://foo -> *http://foo/bar
> <http://foo/bar>*", [1,6])
> PageView("user1", "http://foo/bar/foobar", 5)
> Watermark(4) => emit UserTrajectory("user1", "http://foo -> http://foo/bar ->
> *http://foo/bar/foobar <http://foo/bar/foobar>*", [1, 10])
>
> The urls in bold should be included since there could be events before
> them not arrived yet.
>
>
> Thanks,
> Manu
>
>
> On Tue, Oct 25, 2016 at 1:36 AM Aljoscha Krettek <al...@apache.org>
> wrote:
>
> Hi,
> with some additional information we might be able to figure this out
> together. What specific combination of WindowAssigner/Trigger are you using
> for your example and what is the input stream (including watermarks)?
>
> Cheers,
> Aljoscha
>
> On Mon, 24 Oct 2016 at 06:30 Manu Zhang <ow...@gmail.com> wrote:
>
> Hi,
>
> Say I have a window state of List(("a", 1:00), ("b", 1:03), ("c", 1:06))
> which is triggered to emit when watermark passes the timestamp of an
> element. For example,
>
> on watermark(1:01), List(("a", 1:00)) is emitted
> on watermark(1:04), List(("a", 1:00), ("b", 1:03)) is emitted
> on watermark(1:07), List(("a", 1:00), ("b", 1:03), ("c", 1:06)) is emitted
>
> It seems that if *("c", 1:06) is processed before watermark(1:04)*
> List(("a", 1:00), ("b", 1:03), ("c", 1:06)) will be emitted on
> watermark(1:04). This is incorrect since there could be elements with
> timestamp between 1:04 and 1:06 that have not arrived yet.
>
> I guess this is because watermark trigger doesn't check whether element's
> timestamp has been passed.
>
> Please correct me if any of the above is not right.
>
> Thanks,
> Manu Zhang
>
>
>
>

Re: watermark trigger doesn't check whether element's timestamp is passed

Posted by Aljoscha Krettek <al...@apache.org>.
Ah, I finally understand it. You would a way to query the current watermark
in the window function to only emit those elements where the timestamp is
lower than the watermark.

When the window fires again, do you want to emit elements that you emitted
during the last firing again? If not, I think you also need to use an
evictor to evict the elements from the window where the timestamp is lower
than the watermark. With this FLIP
https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata
we
should be able to extend the WindowFunction Context to also provide the
current watermark. With this recent PR
https://github.com/apache/flink/pull/2736 you would be able to evict
elements from the window state after the window function was called.

Cheers,
Aljoscha

On Tue, 1 Nov 2016 at 02:27 Manu Zhang <ow...@gmail.com> wrote:

> Yes, here's the example
> https://github.com/manuzhang/flink/blob/pv/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/session/PageViewSessionWindowing.scala
>
> If you print and compare the timestamp of timer with that of "PageView" in
> the outputs, you could see what I mean.
>
> I think the recently introduced TimelyFlatMapFunction is close to what I
> want to achieve. It will be great if we can query time information in the
> window function so I filed
> https://issues.apache.org/jira/browse/FLINK-4953
>
> Thanks for your time.
>
> Manu
>
> On Mon, Oct 31, 2016 at 10:29 PM Aljoscha Krettek <al...@apache.org>
> wrote:
>
> Hmm, I don't completely understand what's going on. Could you maybe post
> an example, with the trigger code that shows this behaviour?
>
> Cheers,
> Aljoscha
>
> On Thu, 27 Oct 2016 at 17:12 Manu Zhang <ow...@gmail.com> wrote:
>
> Hi,
>
> It's what I'm seeing. If timers are not fired at the end of window, a
> state (in the window) whose timestamp is *after *the timer will also be
> emitted. That's a problem for event-time trigger.
>
> Thanks,
> Manu
>
>
> On Thu, Oct 27, 2016 at 10:29 PM Aljoscha Krettek <al...@apache.org>
> wrote:
>
> Hi,
> is that example input/output what you would like to achieve or what you
> are currently seeing with Flink? I think for your use case a custom Trigger
> would be required that works like the event-time trigger but additionally
> registers timers for each element where you want to emit.
>
> Cheers,
> Aljoscha
>
> On Wed, 26 Oct 2016 at 04:04 Manu Zhang <ow...@gmail.com> wrote:
>
> Hi Aljoscha,
>
> Thanks for your response.  My use case is to track user trajectory based
> on page view event when they visit a website.  The input would be like a
> list of PageView(userId, url, eventTimestamp) with watermarks (=
> eventTimestamp - duration). I'm trying SessionWindows with some event time
> trigger. Note we can't wait for the end of session window due to latency.
> Instead, we want to emit the user trajectories whenever a buffered
> PageView's event time is passed by watermark. I tried
> ContinuousEventTimeTrigger and a custom trigger which sets timer on each
> element's timestamp. For both triggers I've witnessed a problem like the
> following (e.g. a session gap of 5)
>
> PageView("user1", "http://foo", 1)
> PageView("user1", "http://foo/bar", 2)
> Watermark(1) => emit UserTrajectory("user1", "http://foo -> *http://foo/bar
> <http://foo/bar>*", [1,6])
> PageView("user1", "http://foo/bar/foobar", 5)
> Watermark(4) => emit UserTrajectory("user1", "http://foo -> http://foo/bar ->
> *http://foo/bar/foobar <http://foo/bar/foobar>*", [1, 10])
>
> The urls in bold should be included since there could be events before
> them not arrived yet.
>
>
> Thanks,
> Manu
>
>
> On Tue, Oct 25, 2016 at 1:36 AM Aljoscha Krettek <al...@apache.org>
> wrote:
>
> Hi,
> with some additional information we might be able to figure this out
> together. What specific combination of WindowAssigner/Trigger are you using
> for your example and what is the input stream (including watermarks)?
>
> Cheers,
> Aljoscha
>
> On Mon, 24 Oct 2016 at 06:30 Manu Zhang <ow...@gmail.com> wrote:
>
> Hi,
>
> Say I have a window state of List(("a", 1:00), ("b", 1:03), ("c", 1:06))
> which is triggered to emit when watermark passes the timestamp of an
> element. For example,
>
> on watermark(1:01), List(("a", 1:00)) is emitted
> on watermark(1:04), List(("a", 1:00), ("b", 1:03)) is emitted
> on watermark(1:07), List(("a", 1:00), ("b", 1:03), ("c", 1:06)) is emitted
>
> It seems that if *("c", 1:06) is processed before watermark(1:04)*
> List(("a", 1:00), ("b", 1:03), ("c", 1:06)) will be emitted on
> watermark(1:04). This is incorrect since there could be elements with
> timestamp between 1:04 and 1:06 that have not arrived yet.
>
> I guess this is because watermark trigger doesn't check whether element's
> timestamp has been passed.
>
> Please correct me if any of the above is not right.
>
> Thanks,
> Manu Zhang
>
>
>
>

Re: watermark trigger doesn't check whether element's timestamp is passed

Posted by Manu Zhang <ow...@gmail.com>.
Yes, here's the example
https://github.com/manuzhang/flink/blob/pv/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/session/PageViewSessionWindowing.scala

If you print and compare the timestamp of timer with that of "PageView" in
the outputs, you could see what I mean.

I think the recently introduced TimelyFlatMapFunction is close to what I
want to achieve. It will be great if we can query time information in the
window function so I filed https://issues.apache.org/jira/browse/FLINK-4953

Thanks for your time.

Manu

On Mon, Oct 31, 2016 at 10:29 PM Aljoscha Krettek <al...@apache.org>
wrote:

> Hmm, I don't completely understand what's going on. Could you maybe post
> an example, with the trigger code that shows this behaviour?
>
> Cheers,
> Aljoscha
>
> On Thu, 27 Oct 2016 at 17:12 Manu Zhang <ow...@gmail.com> wrote:
>
> Hi,
>
> It's what I'm seeing. If timers are not fired at the end of window, a
> state (in the window) whose timestamp is *after *the timer will also be
> emitted. That's a problem for event-time trigger.
>
> Thanks,
> Manu
>
>
> On Thu, Oct 27, 2016 at 10:29 PM Aljoscha Krettek <al...@apache.org>
> wrote:
>
> Hi,
> is that example input/output what you would like to achieve or what you
> are currently seeing with Flink? I think for your use case a custom Trigger
> would be required that works like the event-time trigger but additionally
> registers timers for each element where you want to emit.
>
> Cheers,
> Aljoscha
>
> On Wed, 26 Oct 2016 at 04:04 Manu Zhang <ow...@gmail.com> wrote:
>
> Hi Aljoscha,
>
> Thanks for your response.  My use case is to track user trajectory based
> on page view event when they visit a website.  The input would be like a
> list of PageView(userId, url, eventTimestamp) with watermarks (=
> eventTimestamp - duration). I'm trying SessionWindows with some event time
> trigger. Note we can't wait for the end of session window due to latency.
> Instead, we want to emit the user trajectories whenever a buffered
> PageView's event time is passed by watermark. I tried
> ContinuousEventTimeTrigger and a custom trigger which sets timer on each
> element's timestamp. For both triggers I've witnessed a problem like the
> following (e.g. a session gap of 5)
>
> PageView("user1", "http://foo", 1)
> PageView("user1", "http://foo/bar", 2)
> Watermark(1) => emit UserTrajectory("user1", "http://foo -> *http://foo/bar
> <http://foo/bar>*", [1,6])
> PageView("user1", "http://foo/bar/foobar", 5)
> Watermark(4) => emit UserTrajectory("user1", "http://foo -> http://foo/bar ->
> *http://foo/bar/foobar <http://foo/bar/foobar>*", [1, 10])
>
> The urls in bold should be included since there could be events before
> them not arrived yet.
>
>
> Thanks,
> Manu
>
>
> On Tue, Oct 25, 2016 at 1:36 AM Aljoscha Krettek <al...@apache.org>
> wrote:
>
> Hi,
> with some additional information we might be able to figure this out
> together. What specific combination of WindowAssigner/Trigger are you using
> for your example and what is the input stream (including watermarks)?
>
> Cheers,
> Aljoscha
>
> On Mon, 24 Oct 2016 at 06:30 Manu Zhang <ow...@gmail.com> wrote:
>
> Hi,
>
> Say I have a window state of List(("a", 1:00), ("b", 1:03), ("c", 1:06))
> which is triggered to emit when watermark passes the timestamp of an
> element. For example,
>
> on watermark(1:01), List(("a", 1:00)) is emitted
> on watermark(1:04), List(("a", 1:00), ("b", 1:03)) is emitted
> on watermark(1:07), List(("a", 1:00), ("b", 1:03), ("c", 1:06)) is emitted
>
> It seems that if *("c", 1:06) is processed before watermark(1:04)*
> List(("a", 1:00), ("b", 1:03), ("c", 1:06)) will be emitted on
> watermark(1:04). This is incorrect since there could be elements with
> timestamp between 1:04 and 1:06 that have not arrived yet.
>
> I guess this is because watermark trigger doesn't check whether element's
> timestamp has been passed.
>
> Please correct me if any of the above is not right.
>
> Thanks,
> Manu Zhang
>
>
>
>

Re: watermark trigger doesn't check whether element's timestamp is passed

Posted by Aljoscha Krettek <al...@apache.org>.
Hmm, I don't completely understand what's going on. Could you maybe post an
example, with the trigger code that shows this behaviour?

Cheers,
Aljoscha

On Thu, 27 Oct 2016 at 17:12 Manu Zhang <ow...@gmail.com> wrote:

> Hi,
>
> It's what I'm seeing. If timers are not fired at the end of window, a
> state (in the window) whose timestamp is *after *the timer will also be
> emitted. That's a problem for event-time trigger.
>
> Thanks,
> Manu
>
>
> On Thu, Oct 27, 2016 at 10:29 PM Aljoscha Krettek <al...@apache.org>
> wrote:
>
> Hi,
> is that example input/output what you would like to achieve or what you
> are currently seeing with Flink? I think for your use case a custom Trigger
> would be required that works like the event-time trigger but additionally
> registers timers for each element where you want to emit.
>
> Cheers,
> Aljoscha
>
> On Wed, 26 Oct 2016 at 04:04 Manu Zhang <ow...@gmail.com> wrote:
>
> Hi Aljoscha,
>
> Thanks for your response.  My use case is to track user trajectory based
> on page view event when they visit a website.  The input would be like a
> list of PageView(userId, url, eventTimestamp) with watermarks (=
> eventTimestamp - duration). I'm trying SessionWindows with some event time
> trigger. Note we can't wait for the end of session window due to latency.
> Instead, we want to emit the user trajectories whenever a buffered
> PageView's event time is passed by watermark. I tried
> ContinuousEventTimeTrigger and a custom trigger which sets timer on each
> element's timestamp. For both triggers I've witnessed a problem like the
> following (e.g. a session gap of 5)
>
> PageView("user1", "http://foo", 1)
> PageView("user1", "http://foo/bar", 2)
> Watermark(1) => emit UserTrajectory("user1", "http://foo -> *http://foo/bar
> <http://foo/bar>*", [1,6])
> PageView("user1", "http://foo/bar/foobar", 5)
> Watermark(4) => emit UserTrajectory("user1", "http://foo -> http://foo/bar ->
> *http://foo/bar/foobar <http://foo/bar/foobar>*", [1, 10])
>
> The urls in bold should be included since there could be events before
> them not arrived yet.
>
>
> Thanks,
> Manu
>
>
> On Tue, Oct 25, 2016 at 1:36 AM Aljoscha Krettek <al...@apache.org>
> wrote:
>
> Hi,
> with some additional information we might be able to figure this out
> together. What specific combination of WindowAssigner/Trigger are you using
> for your example and what is the input stream (including watermarks)?
>
> Cheers,
> Aljoscha
>
> On Mon, 24 Oct 2016 at 06:30 Manu Zhang <ow...@gmail.com> wrote:
>
> Hi,
>
> Say I have a window state of List(("a", 1:00), ("b", 1:03), ("c", 1:06))
> which is triggered to emit when watermark passes the timestamp of an
> element. For example,
>
> on watermark(1:01), List(("a", 1:00)) is emitted
> on watermark(1:04), List(("a", 1:00), ("b", 1:03)) is emitted
> on watermark(1:07), List(("a", 1:00), ("b", 1:03), ("c", 1:06)) is emitted
>
> It seems that if *("c", 1:06) is processed before watermark(1:04)*
> List(("a", 1:00), ("b", 1:03), ("c", 1:06)) will be emitted on
> watermark(1:04). This is incorrect since there could be elements with
> timestamp between 1:04 and 1:06 that have not arrived yet.
>
> I guess this is because watermark trigger doesn't check whether element's
> timestamp has been passed.
>
> Please correct me if any of the above is not right.
>
> Thanks,
> Manu Zhang
>
>
>
>

Re: watermark trigger doesn't check whether element's timestamp is passed

Posted by Manu Zhang <ow...@gmail.com>.
Hi,

It's what I'm seeing. If timers are not fired at the end of window, a state
(in the window) whose timestamp is *after *the timer will also be emitted.
That's a problem for event-time trigger.

Thanks,
Manu


On Thu, Oct 27, 2016 at 10:29 PM Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> is that example input/output what you would like to achieve or what you
> are currently seeing with Flink? I think for your use case a custom Trigger
> would be required that works like the event-time trigger but additionally
> registers timers for each element where you want to emit.
>
> Cheers,
> Aljoscha
>
> On Wed, 26 Oct 2016 at 04:04 Manu Zhang <ow...@gmail.com> wrote:
>
> Hi Aljoscha,
>
> Thanks for your response.  My use case is to track user trajectory based
> on page view event when they visit a website.  The input would be like a
> list of PageView(userId, url, eventTimestamp) with watermarks (=
> eventTimestamp - duration). I'm trying SessionWindows with some event time
> trigger. Note we can't wait for the end of session window due to latency.
> Instead, we want to emit the user trajectories whenever a buffered
> PageView's event time is passed by watermark. I tried
> ContinuousEventTimeTrigger and a custom trigger which sets timer on each
> element's timestamp. For both triggers I've witnessed a problem like the
> following (e.g. a session gap of 5)
>
> PageView("user1", "http://foo", 1)
> PageView("user1", "http://foo/bar", 2)
> Watermark(1) => emit UserTrajectory("user1", "http://foo -> *http://foo/bar
> <http://foo/bar>*", [1,6])
> PageView("user1", "http://foo/bar/foobar", 5)
> Watermark(4) => emit UserTrajectory("user1", "http://foo -> http://foo/bar ->
> *http://foo/bar/foobar <http://foo/bar/foobar>*", [1, 10])
>
> The urls in bold should be included since there could be events before
> them not arrived yet.
>
>
> Thanks,
> Manu
>
>
> On Tue, Oct 25, 2016 at 1:36 AM Aljoscha Krettek <al...@apache.org>
> wrote:
>
> Hi,
> with some additional information we might be able to figure this out
> together. What specific combination of WindowAssigner/Trigger are you using
> for your example and what is the input stream (including watermarks)?
>
> Cheers,
> Aljoscha
>
> On Mon, 24 Oct 2016 at 06:30 Manu Zhang <ow...@gmail.com> wrote:
>
> Hi,
>
> Say I have a window state of List(("a", 1:00), ("b", 1:03), ("c", 1:06))
> which is triggered to emit when watermark passes the timestamp of an
> element. For example,
>
> on watermark(1:01), List(("a", 1:00)) is emitted
> on watermark(1:04), List(("a", 1:00), ("b", 1:03)) is emitted
> on watermark(1:07), List(("a", 1:00), ("b", 1:03), ("c", 1:06)) is emitted
>
> It seems that if *("c", 1:06) is processed before watermark(1:04)*
> List(("a", 1:00), ("b", 1:03), ("c", 1:06)) will be emitted on
> watermark(1:04). This is incorrect since there could be elements with
> timestamp between 1:04 and 1:06 that have not arrived yet.
>
> I guess this is because watermark trigger doesn't check whether element's
> timestamp has been passed.
>
> Please correct me if any of the above is not right.
>
> Thanks,
> Manu Zhang
>
>
>
>

Re: watermark trigger doesn't check whether element's timestamp is passed

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
is that example input/output what you would like to achieve or what you are
currently seeing with Flink? I think for your use case a custom Trigger
would be required that works like the event-time trigger but additionally
registers timers for each element where you want to emit.

Cheers,
Aljoscha

On Wed, 26 Oct 2016 at 04:04 Manu Zhang <ow...@gmail.com> wrote:

> Hi Aljoscha,
>
> Thanks for your response.  My use case is to track user trajectory based
> on page view event when they visit a website.  The input would be like a
> list of PageView(userId, url, eventTimestamp) with watermarks (=
> eventTimestamp - duration). I'm trying SessionWindows with some event time
> trigger. Note we can't wait for the end of session window due to latency.
> Instead, we want to emit the user trajectories whenever a buffered
> PageView's event time is passed by watermark. I tried
> ContinuousEventTimeTrigger and a custom trigger which sets timer on each
> element's timestamp. For both triggers I've witnessed a problem like the
> following (e.g. a session gap of 5)
>
> PageView("user1", "http://foo", 1)
> PageView("user1", "http://foo/bar", 2)
> Watermark(1) => emit UserTrajectory("user1", "http://foo -> *http://foo/bar
> <http://foo/bar>*", [1,6])
> PageView("user1", "http://foo/bar/foobar", 5)
> Watermark(4) => emit UserTrajectory("user1", "http://foo -> http://foo/bar ->
> *http://foo/bar/foobar <http://foo/bar/foobar>*", [1, 10])
>
> The urls in bold should be included since there could be events before
> them not arrived yet.
>
>
> Thanks,
> Manu
>
>
> On Tue, Oct 25, 2016 at 1:36 AM Aljoscha Krettek <al...@apache.org>
> wrote:
>
> Hi,
> with some additional information we might be able to figure this out
> together. What specific combination of WindowAssigner/Trigger are you using
> for your example and what is the input stream (including watermarks)?
>
> Cheers,
> Aljoscha
>
> On Mon, 24 Oct 2016 at 06:30 Manu Zhang <ow...@gmail.com> wrote:
>
> Hi,
>
> Say I have a window state of List(("a", 1:00), ("b", 1:03), ("c", 1:06))
> which is triggered to emit when watermark passes the timestamp of an
> element. For example,
>
> on watermark(1:01), List(("a", 1:00)) is emitted
> on watermark(1:04), List(("a", 1:00), ("b", 1:03)) is emitted
> on watermark(1:07), List(("a", 1:00), ("b", 1:03), ("c", 1:06)) is emitted
>
> It seems that if *("c", 1:06) is processed before watermark(1:04)*
> List(("a", 1:00), ("b", 1:03), ("c", 1:06)) will be emitted on
> watermark(1:04). This is incorrect since there could be elements with
> timestamp between 1:04 and 1:06 that have not arrived yet.
>
> I guess this is because watermark trigger doesn't check whether element's
> timestamp has been passed.
>
> Please correct me if any of the above is not right.
>
> Thanks,
> Manu Zhang
>
>
>
>

Re: watermark trigger doesn't check whether element's timestamp is passed

Posted by Manu Zhang <ow...@gmail.com>.
Hi Aljoscha,

Thanks for your response.  My use case is to track user trajectory based on
page view event when they visit a website.  The input would be like a list
of PageView(userId, url, eventTimestamp) with watermarks (= eventTimestamp
- duration). I'm trying SessionWindows with some event time trigger. Note
we can't wait for the end of session window due to latency. Instead, we
want to emit the user trajectories whenever a buffered PageView's event
time is passed by watermark. I tried ContinuousEventTimeTrigger and a
custom trigger which sets timer on each element's timestamp. For both
triggers I've witnessed a problem like the following (e.g. a session gap of
5)

PageView("user1", "http://foo", 1)
PageView("user1", "http://foo/bar", 2)
Watermark(1) => emit UserTrajectory("user1", "http://foo -> *http://foo/bar
<http://foo/bar>*", [1,6])
PageView("user1", "http://foo/bar/foobar", 5)
Watermark(4) => emit UserTrajectory("user1", "http://foo -> http://foo/bar ->
*http://foo/bar/foobar <http://foo/bar/foobar>*", [1, 10])

The urls in bold should be included since there could be events before them
not arrived yet.


Thanks,
Manu


On Tue, Oct 25, 2016 at 1:36 AM Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> with some additional information we might be able to figure this out
> together. What specific combination of WindowAssigner/Trigger are you using
> for your example and what is the input stream (including watermarks)?
>
> Cheers,
> Aljoscha
>
> On Mon, 24 Oct 2016 at 06:30 Manu Zhang <ow...@gmail.com> wrote:
>
> Hi,
>
> Say I have a window state of List(("a", 1:00), ("b", 1:03), ("c", 1:06))
> which is triggered to emit when watermark passes the timestamp of an
> element. For example,
>
> on watermark(1:01), List(("a", 1:00)) is emitted
> on watermark(1:04), List(("a", 1:00), ("b", 1:03)) is emitted
> on watermark(1:07), List(("a", 1:00), ("b", 1:03), ("c", 1:06)) is emitted
>
> It seems that if *("c", 1:06) is processed before watermark(1:04)*
> List(("a", 1:00), ("b", 1:03), ("c", 1:06)) will be emitted on
> watermark(1:04). This is incorrect since there could be elements with
> timestamp between 1:04 and 1:06 that have not arrived yet.
>
> I guess this is because watermark trigger doesn't check whether element's
> timestamp has been passed.
>
> Please correct me if any of the above is not right.
>
> Thanks,
> Manu Zhang
>
>
>
>

Re: watermark trigger doesn't check whether element's timestamp is passed

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
with some additional information we might be able to figure this out
together. What specific combination of WindowAssigner/Trigger are you using
for your example and what is the input stream (including watermarks)?

Cheers,
Aljoscha

On Mon, 24 Oct 2016 at 06:30 Manu Zhang <ow...@gmail.com> wrote:

> Hi,
>
> Say I have a window state of List(("a", 1:00), ("b", 1:03), ("c", 1:06))
> which is triggered to emit when watermark passes the timestamp of an
> element. For example,
>
> on watermark(1:01), List(("a", 1:00)) is emitted
> on watermark(1:04), List(("a", 1:00), ("b", 1:03)) is emitted
> on watermark(1:07), List(("a", 1:00), ("b", 1:03), ("c", 1:06)) is emitted
>
> It seems that if *("c", 1:06) is processed before watermark(1:04)*
> List(("a", 1:00), ("b", 1:03), ("c", 1:06)) will be emitted on
> watermark(1:04). This is incorrect since there could be elements with
> timestamp between 1:04 and 1:06 that have not arrived yet.
>
> I guess this is because watermark trigger doesn't check whether element's
> timestamp has been passed.
>
> Please correct me if any of the above is not right.
>
> Thanks,
> Manu Zhang
>
>
>
>