You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Stig Rohde Døssing <st...@gmail.com> on 2019/08/01 09:16:48 UTC

Re: Strange time aggregation behavior exhibited by BaseWindowedBolt

Regarding why you need the 5th tuple, it is happening because you are using
timestamp fields. The windowing code will receive the first 4 tuples and
add them to the same window. Until it receives the 5th tuple, there is no
way to tell whether the window is "done", as we might receive more tuples
that fall within the window. The 5th tuple acts as a trigger that tells the
windowing code that the window with the first 4 tuples is now over, and
should be delivered to your bolt.

More specifically, the way it works is that there's a thread running which
periodically (every 10 seconds in your case) sets a watermark. The
watermark is set to be the timestamp of the newest received tuple, minus
the lag. The watermark is then passed on to a trigger policy, which decides
how to generate windows. The windows are generated from the watermark
backwards, so if e.g. your watermark is 10, your lag is 2 and your interval
is 3, it will try to generate windows for 0-2, 2-5, 5-8. Note that any
tuples with timestamp 9 and 10 aren't delivered yet, as you've said you
expect up to 2 seconds of lag, so it isn't safe to close the window
containing them yet. We can't deliver 9 and 10 until we see a tuple with
timestamp 10 plus the lag, so 12.

See
https://github.com/apache/storm/blob/21bb1388414d373572779289edc785c7e5aa52aa/storm-client/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java
and
https://github.com/apache/storm/blob/925422a5b5ad1c3329a2c2b44db460ae94f70806/storm-client/src/jvm/org/apache/storm/windowing/WatermarkTimeTriggerPolicy.java

Regarding why your tuples are getting split, I don't know. Are you maybe
running multiple tasks of the windowing bolt?

Den tir. 30. jul. 2019 kl. 16.11 skrev Sandeep Singh <
tosandeepsingh@gmail.com>:

> Sorry for multiple message with same subject as I had to register with
> different email address.
> To follow up on the thread, can someone explain to me why the tuples with
> same timestamp are sometimes sent in two different time windows? And also
> why sending an extra 5th tuple is required before storm invokes execute
> method? Do I need to set a different value for tumbling window duration or
> lag?
> Thank you for your help in advance
> Sandeep
>
> On Mon, Jul 29, 2019 at 7:27 PM Sandeep Singh <to...@gmail.com>
> wrote:
>
>> During testing of my topology which uses Storm's Tumbling window, I see
>> strange behavior how my stream of tuples are handled and split into
>> different time windows.
>>
>> I am using a Tumbling window with duration and lag set to 10 seconds:
>>
>> *                val *duration = BaseWindowedBolt.Duration.*seconds*(10)
>>
>>
>>                 myBolt.withTumblingWindow(duration).withTimestampField("timestampField").withLag(duration)
>>
>>
>>
>> When I send four tuples with timestamp set to same value "now - 1 second"
>> (where now = System.*currentTimeMillis*()), I see log messages that
>> storm is able to extract the time information from tuples. However bolt's
>> *"*execute(inputWindow: TupleWindow)" method never gets invoked. In my
>> test I wait for 2 minutes. I do not see any log message about late tuples.
>>
>>
>>
>> When I send five tuples,  the first four with timestamp  "now - 1 second"
>> and last one with "now + 1 hour", I see Storm is able to extract all the
>> five tuples.  However the execute(inputWindow: TupleWindow) method is
>> either invoked
>>
>>   a) only once with first four tuple (the behavior I expected)  or,
>>
>>   b) twice, first invocation with tuple 1 & 2, second invocation with
>> tuple 3 & 4. Since all the four tuples have exactly same timestamp, I don't
>> understand why tuples are partitined in different time windows.
>>
>> Also the bolt's execute method never get's invoked with 5th tuple.
>> However, sending 5th tuple (which is well outside the time duration window
>> of 10 seconds) ensure that execute method is called once or twice for the
>> first four tuples.
>>
>>
>>
>

Re: Strange time aggregation behavior exhibited by BaseWindowedBolt

Posted by Mauro Giusti <ma...@microsoft.com>.
unsubscribe


Sent from Outlook<http://aka.ms/weboutlook>

________________________________
From: Stig Rohde Døssing <st...@gmail.com>
Sent: Thursday, August 1, 2019 5:55 AM
To: user@storm.apache.org <us...@storm.apache.org>
Subject: Re: Strange time aggregation behavior exhibited by BaseWindowedBolt

> Does the watermark ever gets generated based on actual clock, window interval
No, not when you're using the .withTimestampField method. The watermark gets created on a set (wall clock time) interval, but the watermark value is based on the timestamp values extracted from your tuples. The reason this behavior makes sense is that when you use .withTimestampField, you're essentially saying "use the tuple timestamps to keep track of time, instead of using real wall clock time to keep track of time".

No, if tuples 9 and 10 are the last tuples in the stream, they never get delivered. The real wall clock time doesn't matter when you use .withTimestampField. The only way "time passes" for the watermark generator is if we receive new tuples with newer timestamps. If you want to use real wall clock time instead, you shouldn't call .withTimestampField. In that case, the windowing code would be based on real time instead, so 9 and 10 would get delivered as soon as 3 seconds have passed. The disadvantage to using real time is that processing becomes a little less predictable when e.g. the machine running the processing is slow, or in case you're reprocessing old messages.

> that explains why only first few were getting flushed in time window
Great, happy you found the cause.

Den tor. 1. aug. 2019 kl. 14.04 skrev Sandeep Singh <to...@gmail.com>>:
Thank you very much for the response. Please see my comments inline   sandeep>>

On Thu, Aug 1, 2019 at 5:17 AM Stig Rohde Døssing <st...@gmail.com>> wrote:
Regarding why you need the 5th tuple, it is happening because you are using timestamp fields. The windowing code will receive the first 4 tuples and add them to the same window. Until it receives the 5th tuple, there is no way to tell whether the window is "done", as we might receive more tuples that fall within the window. The 5th tuple acts as a trigger that tells the windowing code that the window with the first 4 tuples is now over, and should be delivered to your bolt.

More specifically, the way it works is that there's a thread running which periodically (every 10 seconds in your case) sets a watermark. The watermark is set to be the timestamp of the newest received tuple, minus the lag. The watermark is then passed on to a trigger policy, which decides how to generate windows. The windows are generated from the watermark backwards, so if e.g. your watermark is 10, your lag is 2 and your interval is 3, it will try to generate windows for 0-2, 2-5, 5-8. Note that any tuples with timestamp 9 and 10 aren't delivered yet, as you've said you expect up to 2 seconds of lag, so it isn't safe to close the window containing them yet. We can't deliver 9 and 10 until we see a tuple with timestamp 10 plus the lag, so 12.

sandeep>> Got that. Does the watermark ever gets generated based on actual clock, window interval? For above example, will tuple with timestamps 9 and 10 will ever get emitted if tuple 10 was the last tuple in the stream and then there is no activity say for more tuples for next 2 minutes?
I think my confusion was that they will eventually flushed out after (window interval + lag).

See https://github.com/apache/storm/blob/21bb1388414d373572779289edc785c7e5aa52aa/storm-client/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java<https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fstorm%2Fblob%2F21bb1388414d373572779289edc785c7e5aa52aa%2Fstorm-client%2Fsrc%2Fjvm%2Forg%2Fapache%2Fstorm%2Fwindowing%2FWaterMarkEventGenerator.java&data=02%7C01%7Cmaurgi%40microsoft.com%7C825bc2456a524cb6f1cc08d7167f89e0%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637002609349043209&sdata=edMoP2SqtwVZJVXCknkqozGOcshYfcKFLQYkMfR5Ibk%3D&reserved=0> and https://github.com/apache/storm/blob/925422a5b5ad1c3329a2c2b44db460ae94f70806/storm-client/src/jvm/org/apache/storm/windowing/WatermarkTimeTriggerPolicy.java<https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fstorm%2Fblob%2F925422a5b5ad1c3329a2c2b44db460ae94f70806%2Fstorm-client%2Fsrc%2Fjvm%2Forg%2Fapache%2Fstorm%2Fwindowing%2FWatermarkTimeTriggerPolicy.java&data=02%7C01%7Cmaurgi%40microsoft.com%7C825bc2456a524cb6f1cc08d7167f89e0%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637002609349053210&sdata=nId0RXwbQ4HuZMbRLSA%2BMwITPgxo5bUKrept2j3sKNI%3D&reserved=0>

Regarding why your tuples are getting split, I don't know. Are you maybe running multiple tasks of the windowing bolt?

sandeep>> Checked the code I was running only 1 task. However I was using Kafka Spout to receive the messages in my topology. It was possible to get tuples with higher time to get processed earlier than other. I started sending the messaged in blocking mode (wait for previous sent to complete). If the watermark and trigger in based on order in which tuples arrived,  that explains why only first few were getting flushed in time window


Den tir. 30. jul. 2019 kl. 16.11 skrev Sandeep Singh <to...@gmail.com>>:
Sorry for multiple message with same subject as I had to register with different email address.
To follow up on the thread, can someone explain to me why the tuples with same timestamp are sometimes sent in two different time windows? And also why sending an extra 5th tuple is required before storm invokes execute method? Do I need to set a different value for tumbling window duration or lag?
Thank you for your help in advance
Sandeep

On Mon, Jul 29, 2019 at 7:27 PM Sandeep Singh <to...@gmail.com>> wrote:

During testing of my topology which uses Storm's Tumbling window, I see strange behavior how my stream of tuples are handled and split into different time windows.

I am using a Tumbling window with duration and lag set to 10 seconds:

                val duration = BaseWindowedBolt.Duration.seconds(10)

                myBolt.withTumblingWindow(duration).withTimestampField("timestampField").withLag(duration)



When I send four tuples with timestamp set to same value "now - 1 second" (where now = System.currentTimeMillis()), I see log messages that storm is able to extract the time information from tuples. However bolt's "execute(inputWindow: TupleWindow)" method never gets invoked. In my test I wait for 2 minutes. I do not see any log message about late tuples.



When I send five tuples,  the first four with timestamp  "now - 1 second" and last one with "now + 1 hour", I see Storm is able to extract all the five tuples.  However the execute(inputWindow: TupleWindow) method is either invoked

  a) only once with first four tuple (the behavior I expected)  or,

  b) twice, first invocation with tuple 1 & 2, second invocation with tuple 3 & 4. Since all the four tuples have exactly same timestamp, I don't understand why tuples are partitined in different time windows.

Also the bolt's execute method never get's invoked with 5th tuple. However, sending 5th tuple (which is well outside the time duration window of 10 seconds) ensure that execute method is called once or twice for the first four tuples.



Re: Strange time aggregation behavior exhibited by BaseWindowedBolt

Posted by Stig Rohde Døssing <st...@gmail.com>.
> Does the watermark ever gets generated based on actual clock, window
interval
No, not when you're using the .withTimestampField method. The watermark
gets created on a set (wall clock time) interval, but the watermark value
is based on the timestamp values extracted from your tuples. The reason
this behavior makes sense is that when you use .withTimestampField, you're
essentially saying "use the tuple timestamps to keep track of time, instead
of using real wall clock time to keep track of time".

No, if tuples 9 and 10 are the last tuples in the stream, they never get
delivered. The real wall clock time doesn't matter when you use
.withTimestampField. The only way "time passes" for the watermark generator
is if we receive new tuples with newer timestamps. If you want to use real
wall clock time instead, you shouldn't call .withTimestampField. In that
case, the windowing code would be based on real time instead, so 9 and 10
would get delivered as soon as 3 seconds have passed. The disadvantage to
using real time is that processing becomes a little less predictable when
e.g. the machine running the processing is slow, or in case you're
reprocessing old messages.

> that explains why only first few were getting flushed in time window
Great, happy you found the cause.

Den tor. 1. aug. 2019 kl. 14.04 skrev Sandeep Singh <
tosandeepsingh@gmail.com>:

> Thank you very much for the response. Please see my comments inline
>  sandeep>>
>
> On Thu, Aug 1, 2019 at 5:17 AM Stig Rohde Døssing <st...@gmail.com>
> wrote:
>
>> Regarding why you need the 5th tuple, it is happening because you are
>> using timestamp fields. The windowing code will receive the first 4 tuples
>> and add them to the same window. Until it receives the 5th tuple, there is
>> no way to tell whether the window is "done", as we might receive more
>> tuples that fall within the window. The 5th tuple acts as a trigger that
>> tells the windowing code that the window with the first 4 tuples is now
>> over, and should be delivered to your bolt.
>>
>>
> More specifically, the way it works is that there's a thread running which
>> periodically (every 10 seconds in your case) sets a watermark. The
>> watermark is set to be the timestamp of the newest received tuple, minus
>> the lag. The watermark is then passed on to a trigger policy, which decides
>> how to generate windows. The windows are generated from the watermark
>> backwards, so if e.g. your watermark is 10, your lag is 2 and your interval
>> is 3, it will try to generate windows for 0-2, 2-5, 5-8. Note that any
>> tuples with timestamp 9 and 10 aren't delivered yet, as you've said you
>> expect up to 2 seconds of lag, so it isn't safe to close the window
>> containing them yet. We can't deliver 9 and 10 until we see a tuple with
>> timestamp 10 plus the lag, so 12.
>>
>> sandeep>> Got that. Does the watermark ever gets generated based on
> actual clock, window interval? For above example, will tuple with
> timestamps 9 and 10 will ever get emitted if tuple 10 was the last tuple in
> the stream and then there is no activity say for more tuples for next 2
> minutes?
> I think my confusion was that they will eventually flushed out after
> (window interval + lag).
>
>
>> See
>> https://github.com/apache/storm/blob/21bb1388414d373572779289edc785c7e5aa52aa/storm-client/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java
>> and
>> https://github.com/apache/storm/blob/925422a5b5ad1c3329a2c2b44db460ae94f70806/storm-client/src/jvm/org/apache/storm/windowing/WatermarkTimeTriggerPolicy.java
>>
>> Regarding why your tuples are getting split, I don't know. Are you maybe
>> running multiple tasks of the windowing bolt?
>>
>> sandeep>> Checked the code I was running only 1 task. However I was using
> Kafka Spout to receive the messages in my topology. It was possible to get
> tuples with higher time to get processed earlier than other. I started
> sending the messaged in blocking mode (wait for previous sent to complete).
> If the watermark and trigger in based on order in which tuples arrived,
> that explains why only first few were getting flushed in time window
>
>
> Den tir. 30. jul. 2019 kl. 16.11 skrev Sandeep Singh <
>> tosandeepsingh@gmail.com>:
>>
>>> Sorry for multiple message with same subject as I had to register with
>>> different email address.
>>> To follow up on the thread, can someone explain to me why the tuples
>>> with same timestamp are sometimes sent in two different time windows? And
>>> also why sending an extra 5th tuple is required before storm invokes
>>> execute method? Do I need to set a different value for tumbling window
>>> duration or lag?
>>> Thank you for your help in advance
>>> Sandeep
>>>
>>> On Mon, Jul 29, 2019 at 7:27 PM Sandeep Singh <to...@gmail.com>
>>> wrote:
>>>
>>>> During testing of my topology which uses Storm's Tumbling window, I see
>>>> strange behavior how my stream of tuples are handled and split into
>>>> different time windows.
>>>>
>>>> I am using a Tumbling window with duration and lag set to 10 seconds:
>>>>
>>>> *                val *duration = BaseWindowedBolt.Duration.*seconds*
>>>> (10)
>>>>
>>>>
>>>>                 myBolt.withTumblingWindow(duration).withTimestampField("timestampField").withLag(duration)
>>>>
>>>>
>>>>
>>>> When I send four tuples with timestamp set to same value "now - 1
>>>> second" (where now = System.*currentTimeMillis*()), I see log messages
>>>> that storm is able to extract the time information from tuples. However
>>>> bolt's *"*execute(inputWindow: TupleWindow)" method never gets
>>>> invoked. In my test I wait for 2 minutes. I do not see any log message
>>>> about late tuples.
>>>>
>>>>
>>>>
>>>> When I send five tuples,  the first four with timestamp  "now - 1
>>>> second" and last one with "now + 1 hour", I see Storm is able to extract
>>>> all the five tuples.  However the execute(inputWindow: TupleWindow) method
>>>> is either invoked
>>>>
>>>>   a) only once with first four tuple (the behavior I expected)  or,
>>>>
>>>>   b) twice, first invocation with tuple 1 & 2, second invocation with
>>>> tuple 3 & 4. Since all the four tuples have exactly same timestamp, I don't
>>>> understand why tuples are partitined in different time windows.
>>>>
>>>> Also the bolt's execute method never get's invoked with 5th tuple.
>>>> However, sending 5th tuple (which is well outside the time duration window
>>>> of 10 seconds) ensure that execute method is called once or twice for the
>>>> first four tuples.
>>>>
>>>>
>>>>
>>>

Re: Strange time aggregation behavior exhibited by BaseWindowedBolt

Posted by Sandeep Singh <to...@gmail.com>.
Thank you very much for the response. Please see my comments inline
 sandeep>>

On Thu, Aug 1, 2019 at 5:17 AM Stig Rohde Døssing <st...@gmail.com>
wrote:

> Regarding why you need the 5th tuple, it is happening because you are
> using timestamp fields. The windowing code will receive the first 4 tuples
> and add them to the same window. Until it receives the 5th tuple, there is
> no way to tell whether the window is "done", as we might receive more
> tuples that fall within the window. The 5th tuple acts as a trigger that
> tells the windowing code that the window with the first 4 tuples is now
> over, and should be delivered to your bolt.
>
>
More specifically, the way it works is that there's a thread running which
> periodically (every 10 seconds in your case) sets a watermark. The
> watermark is set to be the timestamp of the newest received tuple, minus
> the lag. The watermark is then passed on to a trigger policy, which decides
> how to generate windows. The windows are generated from the watermark
> backwards, so if e.g. your watermark is 10, your lag is 2 and your interval
> is 3, it will try to generate windows for 0-2, 2-5, 5-8. Note that any
> tuples with timestamp 9 and 10 aren't delivered yet, as you've said you
> expect up to 2 seconds of lag, so it isn't safe to close the window
> containing them yet. We can't deliver 9 and 10 until we see a tuple with
> timestamp 10 plus the lag, so 12.
>
> sandeep>> Got that. Does the watermark ever gets generated based on actual
clock, window interval? For above example, will tuple with timestamps 9 and
10 will ever get emitted if tuple 10 was the last tuple in the stream and
then there is no activity say for more tuples for next 2 minutes?
I think my confusion was that they will eventually flushed out after
(window interval + lag).


> See
> https://github.com/apache/storm/blob/21bb1388414d373572779289edc785c7e5aa52aa/storm-client/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java
> and
> https://github.com/apache/storm/blob/925422a5b5ad1c3329a2c2b44db460ae94f70806/storm-client/src/jvm/org/apache/storm/windowing/WatermarkTimeTriggerPolicy.java
>
> Regarding why your tuples are getting split, I don't know. Are you maybe
> running multiple tasks of the windowing bolt?
>
> sandeep>> Checked the code I was running only 1 task. However I was using
Kafka Spout to receive the messages in my topology. It was possible to get
tuples with higher time to get processed earlier than other. I started
sending the messaged in blocking mode (wait for previous sent to complete).
If the watermark and trigger in based on order in which tuples arrived,
that explains why only first few were getting flushed in time window


Den tir. 30. jul. 2019 kl. 16.11 skrev Sandeep Singh <
> tosandeepsingh@gmail.com>:
>
>> Sorry for multiple message with same subject as I had to register with
>> different email address.
>> To follow up on the thread, can someone explain to me why the tuples with
>> same timestamp are sometimes sent in two different time windows? And also
>> why sending an extra 5th tuple is required before storm invokes execute
>> method? Do I need to set a different value for tumbling window duration or
>> lag?
>> Thank you for your help in advance
>> Sandeep
>>
>> On Mon, Jul 29, 2019 at 7:27 PM Sandeep Singh <to...@gmail.com>
>> wrote:
>>
>>> During testing of my topology which uses Storm's Tumbling window, I see
>>> strange behavior how my stream of tuples are handled and split into
>>> different time windows.
>>>
>>> I am using a Tumbling window with duration and lag set to 10 seconds:
>>>
>>> *                val *duration = BaseWindowedBolt.Duration.*seconds*(10)
>>>
>>>
>>>                 myBolt.withTumblingWindow(duration).withTimestampField("timestampField").withLag(duration)
>>>
>>>
>>>
>>> When I send four tuples with timestamp set to same value "now - 1
>>> second" (where now = System.*currentTimeMillis*()), I see log messages
>>> that storm is able to extract the time information from tuples. However
>>> bolt's *"*execute(inputWindow: TupleWindow)" method never gets invoked.
>>> In my test I wait for 2 minutes. I do not see any log message about late
>>> tuples.
>>>
>>>
>>>
>>> When I send five tuples,  the first four with timestamp  "now - 1
>>> second" and last one with "now + 1 hour", I see Storm is able to extract
>>> all the five tuples.  However the execute(inputWindow: TupleWindow) method
>>> is either invoked
>>>
>>>   a) only once with first four tuple (the behavior I expected)  or,
>>>
>>>   b) twice, first invocation with tuple 1 & 2, second invocation with
>>> tuple 3 & 4. Since all the four tuples have exactly same timestamp, I don't
>>> understand why tuples are partitined in different time windows.
>>>
>>> Also the bolt's execute method never get's invoked with 5th tuple.
>>> However, sending 5th tuple (which is well outside the time duration window
>>> of 10 seconds) ensure that execute method is called once or twice for the
>>> first four tuples.
>>>
>>>
>>>
>>