You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Stephan Epping <st...@zweitag.de> on 2016/11/10 10:01:07 UTC

Re: Maintaining watermarks per key, instead of per operator instance

Hello,

I found this question in the Nabble archive (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html>) but was unable/dont know how to reply.

Here is my question regarding the mentioned thread:

> Hello, 
> 
> I have similar requirements (see StackOverflor http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data <http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data>). I am pretty new to flink, could you elaborate on a possible solution? We can guarantee good ordering by sensor_id, thus watermarking by key would be the only reasonable way for us (sensorData.keyBy('id').timeWindow(1.minute).sum('value')), could I do my own watermarking aftersensorData.keyBy('id').overwriteWatermarking()... per key? Or maybe using custom state plus a custom trigger? What happens if a sensor dies or is being removed completely, how can this be detected as watermarks would be ignored for window garbage collection. Or could we dynamically schedule a job of each sensor? Which would result in 1000 Jobs.


Thanks,
Stephan



Re: Maintaining watermarks per key, instead of per operator instance

Posted by Stephan Epping <st...@zweitag.de>.
Hey Aljoscha,

that sounds very promising, awesome! Though, I still would need to implement my own window management logic (window assignment and window state purging), right? I was thinking about reusing some of the existing components (TimeWindow) and WindowAssigner, but run my own WindowOperator (aka ProcessFunction). But I am not sure, if that is done easily. I would love to hear your opinion on that, and what the tricky parts will be? For example, common mistakes you experienced in developing the windowing mechanism.

best Stephan


> On 14 Nov 2016, at 19:05, Aljoscha Krettek <al...@apache.org> wrote:
> 
> Hi Stephan,
> I was going to suggest that using a flatMap and tracking the timestamp of each key yourself is a bit like having a per-key watermark. I wanted to wait a bit before answering because I'm currently working on a new type of Function that will be release with Flink 1.2: ProcessFunction. This is somewhat like a FlatMap but also allows to access the element timestamp, query current processing time/event time and set (per key) timers for processing time and event time. With this you should be able to easily implement your per-key tracking, I hope.
> 
> Cheers,
> Aljoscha
> 
> P.S. ProcessFunction is already in the Flink repository but it's called TimelyFlatMapFunction right now, because I was working on it under that working title.
> 
> On Mon, 14 Nov 2016 at 15:47 kaelumania <stephan.epping@zweitag.de <ma...@zweitag.de>> wrote:
> Hey Fabian,
> 
> thank you very much. 
> 
> - yes, I would window by event time and fire/purge by processing time
> - Cheaper in the end meant, that having too much state in the flink cluster would be more expensive, as we store all data in cassandra too.I think the fault tolerance would be okay, as we would make a compare and set with cassandra. 
> 
> With the flatMap Operator wouldn’t it be like running my own windowing mechanism? I need to keep the aggregate window per sensor open (with checkpointing and state management) until I receive an element for a sensor that is later in time than the windows time and then purge the state and emit a new event (which is like having a watermark per sensor). Further, I need a timer that fires like after 24 hours, in case a sensor dies and doesn’t send more data which might is possible with window assigner/trigger, right? But not inside normal functions, e.g. flatMap? We can guarantee that all sensor data per sensor comes almost in order (might be out of order within a few seconds), but there might be gaps of several hours after network partitions.
> 
> There is now way to define/redefine the watermark per keyed stream? Or adjust the window assigner + trigger to achieve the desired behaviour? I am a bit reserved in implementing the whole state management. Do you plan to support such use cases on keyed streams? Maybe the WatermarkAssigner could also receive information about the key for wich the watermark should be calculated etc.
> 
> best, Stephan
> 
> 
> 
>> On 14 Nov 2016, at 15:17, Fabian Hueske-2 [via Apache Flink User Mailing List archive.] <[hidden email] <http://user/SendEmail.jtp?type=node&node=10098&i=0>> wrote:
>> 
> 
>> Hi Stephan,
>> 
>> I'm skeptical about two things: 
>> - using processing time will result in inaccurately bounded aggregates (or do you want to group by event time in a processing time window?)
>> - writing to and reading from Cassandra might be expensive (not sure what you mean by cheaper in the end) and it is not integrated with Flink's checkpointing mechanism for fault-tolerance.
>> 
>> To me, the stateful FlatMapOperator looks like the best approach. There is an upcoming feature for registering timers in user-functions, i.e., a function is called after the timer exceeds. This could be helpful to overcome the problem of closing the window without new data.
>> 
>> Best, 
>> Fabian
> 
>> 
>> 2016-11-14 8:39 GMT+01:00 Stephan Epping <<a href="x-msg://10/user/SendEmail.jtp?type=node&amp;node=10094&amp;i=0 <x-msg://10/user/SendEmail.jtp?type=node&amp;node=10094&amp;i=0>" target="_top" rel="nofollow" link="external" class="">[hidden email]>:
> 
>> Hello Fabian,
>> 
>> Thank you very much. What is your opinion on the following solution:
>> 
>> - Window data per time window, e.g. 15 minutes
>> - using processing time as trigger, e.g. 15 minutes
>> - which results in an aggregate over sensor values
>> - then use cassandra to select the previous aggregate (as there can be multiple for the time window due to processing time)
>> - then update the aggregate and put it into a cassandra sink again
>> 
>> The cassandra select will be a bit slower than using an in memory/flink state, but will be cheaper in the end. Further, what does this have for consequences?
>> For example, replaying events will be more difficult, right? Also, what about Snapshots? Will they work with the mentioned design?
>> 
>> kind regards,
>> Stephan
> 
>>> On 11 Nov 2016, at 00:39, Fabian Hueske <<a href="x-msg://10/user/SendEmail.jtp?type=node&amp;node=10094&amp;i=1 <x-msg://10/user/SendEmail.jtp?type=node&amp;node=10094&amp;i=1>" target="_top" rel="nofollow" link="external" class="">[hidden email]> wrote:
>>> 
> 
>>> Hi Stephan,
>>> 
>>> I just wrote an answer to your SO question. 
>>> 
>>> Best, Fabian
> 
>>> 
>>> 2016-11-10 11:01 GMT+01:00 Stephan Epping <<a href="x-msg://10/user/SendEmail.jtp?type=node&amp;node=10094&amp;i=2 <x-msg://10/user/SendEmail.jtp?type=node&amp;node=10094&amp;i=2>" target="_top" rel="nofollow" link="external" class="">[hidden email]>:
> 
>>> 
>>> Hello,
>>> 
>>> I found this question in the Nabble archive (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html>) but was unable/dont know how to reply.
>>> 
>>> Here is my question regarding the mentioned thread:
>>> 
>>>> Hello, 
>>>> 
>>>> I have similar requirements (see StackOverflor http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data <http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data>). I am pretty new to flink, could you elaborate on a possible solution? We can guarantee good ordering by sensor_id, thus watermarking by key would be the only reasonable way for us (sensorData.keyBy('id').timeWindow(1.minute).sum('value')), could I do my own watermarking aftersensorData.keyBy('id').overwriteWatermarking()... per key? Or maybe using custom state plus a custom trigger? What happens if a sensor dies or is being removed completely, how can this be detected as watermarks would be ignored for window garbage collection. Or could we dynamically schedule a job of each sensor? Which would result in 1000 Jobs.
>>> 
>>> 
>>> Thanks,
>>> Stephan
>>> 
>>> 
> 
>>> 
>> 
>> 
>> 
>> 
>> If you reply to this email, your message will be added to the discussion below:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10094.html <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10094.html>
>> To unsubscribe from Maintaining watermarks per key, instead of per operator instance, click here <>.
>> NAML <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
> 
> View this message in context: Re: Maintaining watermarks per key, instead of per operator instance <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10098.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at Nabble.com <http://nabble.com/>.


Re: Maintaining watermarks per key, instead of per operator instance

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Stephan,
I was going to suggest that using a flatMap and tracking the timestamp of
each key yourself is a bit like having a per-key watermark. I wanted to
wait a bit before answering because I'm currently working on a new type of
Function that will be release with Flink 1.2: ProcessFunction. This is
somewhat like a FlatMap but also allows to access the element timestamp,
query current processing time/event time and set (per key) timers for
processing time and event time. With this you should be able to easily
implement your per-key tracking, I hope.

Cheers,
Aljoscha

P.S. ProcessFunction is already in the Flink repository but it's called
TimelyFlatMapFunction right now, because I was working on it under that
working title.

On Mon, 14 Nov 2016 at 15:47 kaelumania <st...@zweitag.de> wrote:

> Hey Fabian,
>
> thank you very much.
>
> - yes, I would window by event time and fire/purge by processing time
> - Cheaper in the end meant, that having too much state in the flink
> cluster would be more expensive, as we store all data in cassandra too.I
> think the fault tolerance would be okay, as we would make a compare and set
> with cassandra.
>
> With the flatMap Operator wouldn’t it be like running my own windowing
> mechanism? I need to keep the aggregate window per sensor open (with
> checkpointing and state management) until I receive an element for a sensor
> that is later in time than the windows time and then purge the state and
> emit a new event (which is like having a watermark per sensor). Further, I
> need a timer that fires like after 24 hours, in case a sensor dies and
> doesn’t send more data which might is possible with window
> assigner/trigger, right? But not inside normal functions, e.g. flatMap? We
> can guarantee that all sensor data per sensor comes almost in order (might
> be out of order within a few seconds), but there might be gaps of several
> hours after network partitions.
>
> There is now way to define/redefine the watermark per keyed stream? Or
> adjust the window assigner + trigger to achieve the desired behaviour? I am
> a bit reserved in implementing the whole state management. Do you plan to
> support such use cases on keyed streams? Maybe the WatermarkAssigner could
> also receive information about the key for wich the watermark should be
> calculated etc.
>
> best, Stephan
>
>
> On 14 Nov 2016, at 15:17, Fabian Hueske-2 [via Apache Flink User Mailing
> List archive.] <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=10098&i=0>> wrote:
>
> Hi Stephan,
>
> I'm skeptical about two things:
> - using processing time will result in inaccurately bounded aggregates (or
> do you want to group by event time in a processing time window?)
> - writing to and reading from Cassandra might be expensive (not sure what
> you mean by cheaper in the end) and it is not integrated with Flink's
> checkpointing mechanism for fault-tolerance.
>
> To me, the stateful FlatMapOperator looks like the best approach. There is
> an upcoming feature for registering timers in user-functions, i.e., a
> function is called after the timer exceeds. This could be helpful to
> overcome the problem of closing the window without new data.
>
> Best,
> Fabian
>
>
> 2016-11-14 8:39 GMT+01:00 Stephan Epping <<a
> href="x-msg://10/user/SendEmail.jtp?type=node&amp;node=10094&amp;i=0"
> target="_top" rel="nofollow" link="external" class="">[hidden email]>:
>
> Hello Fabian,
>
> Thank you very much. What is your opinion on the following solution:
>
> - Window data per time window, e.g. 15 minutes
> - using processing time as trigger, e.g. 15 minutes
> - which results in an aggregate over sensor values
> - then use cassandra to select the previous aggregate (as there can be
> multiple for the time window due to processing time)
> - then update the aggregate and put it into a cassandra sink again
>
> The cassandra select will be a bit slower than using an in memory/flink
> state, but will be cheaper in the end. Further, what does this have for
> consequences?
> For example, replaying events will be more difficult, right? Also, what
> about Snapshots? Will they work with the mentioned design?
>
> kind regards,
> Stephan
>
> On 11 Nov 2016, at 00:39, Fabian Hueske <<a
> href="x-msg://10/user/SendEmail.jtp?type=node&amp;node=10094&amp;i=1"
> target="_top" rel="nofollow" link="external" class="">[hidden email]> wrote:
>
> Hi Stephan,
>
> I just wrote an answer to your SO question.
>
> Best, Fabian
>
>
> 2016-11-10 11:01 GMT+01:00 Stephan Epping <<a
> href="x-msg://10/user/SendEmail.jtp?type=node&amp;node=10094&amp;i=2"
> target="_top" rel="nofollow" link="external" class="">[hidden email]>:
>
>
> Hello,
>
> I found this question in the Nabble archive (
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html)
> but was unable/dont know how to reply.
>
> Here is my question regarding the mentioned thread:
>
> Hello,
>
> I have similar requirements (see StackOverflor
> http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data).
> I am pretty new to flink, could you elaborate on a possible solution? We
> can guarantee good ordering by sensor_id, thus watermarking by key would be
> the only reasonable way for us (
> *sensorData.keyBy('id').timeWindow(1.minute).sum('value')*), could I do
> my own watermarking after*sensorData.keyBy('id').overwriteWatermarking()*...
> per key? Or maybe using custom state plus a custom trigger? What happens if
> a sensor dies or is being removed completely, how can this be detected as
> watermarks would be ignored for window garbage collection. Or could we
> dynamically schedule a job of each sensor? Which would result in 1000 Jobs.
>
>
> Thanks,
> Stephan
>
>
>
>
>
>
>
> ------------------------------
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10094.html
> To unsubscribe from Maintaining watermarks per key, instead of per
> operator instance, click here.
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>
>
>
> ------------------------------
> View this message in context: Re: Maintaining watermarks per key, instead
> of per operator instance
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10098.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com.
>

Re: Maintaining watermarks per key, instead of per operator instance

Posted by kaelumania <st...@zweitag.de>.
Hey Fabian,

thank you very much. 

- yes, I would window by event time and fire/purge by processing time
- Cheaper in the end meant, that having too much state in the flink cluster would be more expensive, as we store all data in cassandra too.I think the fault tolerance would be okay, as we would make a compare and set with cassandra. 

With the flatMap Operator wouldn’t it be like running my own windowing mechanism? I need to keep the aggregate window per sensor open (with checkpointing and state management) until I receive an element for a sensor that is later in time than the windows time and then purge the state and emit a new event (which is like having a watermark per sensor). Further, I need a timer that fires like after 24 hours, in case a sensor dies and doesn’t send more data which might is possible with window assigner/trigger, right? But not inside normal functions, e.g. flatMap? We can guarantee that all sensor data per sensor comes almost in order (might be out of order within a few seconds), but there might be gaps of several hours after network partitions.

There is now way to define/redefine the watermark per keyed stream? Or adjust the window assigner + trigger to achieve the desired behaviour? I am a bit reserved in implementing the whole state management. Do you plan to support such use cases on keyed streams? Maybe the WatermarkAssigner could also receive information about the key for wich the watermark should be calculated etc.

best, Stephan


> On 14 Nov 2016, at 15:17, Fabian Hueske-2 [via Apache Flink User Mailing List archive.] <ml...@n4.nabble.com> wrote:
> 
> Hi Stephan,
> 
> I'm skeptical about two things: 
> - using processing time will result in inaccurately bounded aggregates (or do you want to group by event time in a processing time window?)
> - writing to and reading from Cassandra might be expensive (not sure what you mean by cheaper in the end) and it is not integrated with Flink's checkpointing mechanism for fault-tolerance.
> 
> To me, the stateful FlatMapOperator looks like the best approach. There is an upcoming feature for registering timers in user-functions, i.e., a function is called after the timer exceeds. This could be helpful to overcome the problem of closing the window without new data.
> 
> Best, 
> Fabian
> 
> 2016-11-14 8:39 GMT+01:00 Stephan Epping <[hidden email] <x-msg://10/user/SendEmail.jtp?type=node&node=10094&i=0>>:
> Hello Fabian,
> 
> Thank you very much. What is your opinion on the following solution:
> 
> - Window data per time window, e.g. 15 minutes
> - using processing time as trigger, e.g. 15 minutes
> - which results in an aggregate over sensor values
> - then use cassandra to select the previous aggregate (as there can be multiple for the time window due to processing time)
> - then update the aggregate and put it into a cassandra sink again
> 
> The cassandra select will be a bit slower than using an in memory/flink state, but will be cheaper in the end. Further, what does this have for consequences?
> For example, replaying events will be more difficult, right? Also, what about Snapshots? Will they work with the mentioned design?
> 
> kind regards,
> Stephan
> 
> 
>> On 11 Nov 2016, at 00:39, Fabian Hueske <[hidden email] <x-msg://10/user/SendEmail.jtp?type=node&node=10094&i=1>> wrote:
>> 
>> Hi Stephan,
>> 
>> I just wrote an answer to your SO question. 
>> 
>> Best, Fabian
>> 
>> 2016-11-10 11:01 GMT+01:00 Stephan Epping <[hidden email] <x-msg://10/user/SendEmail.jtp?type=node&node=10094&i=2>>:
>> Hello,
>> 
>> I found this question in the Nabble archive (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html>) but was unable/dont know how to reply.
>> 
>> Here is my question regarding the mentioned thread:
>> 
>>> Hello, 
>>> 
>>> I have similar requirements (see StackOverflor http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data <http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data>). I am pretty new to flink, could you elaborate on a possible solution? We can guarantee good ordering by sensor_id, thus watermarking by key would be the only reasonable way for us (sensorData.keyBy('id').timeWindow(1.minute).sum('value')), could I do my own watermarking aftersensorData.keyBy('id').overwriteWatermarking()... per key? Or maybe using custom state plus a custom trigger? What happens if a sensor dies or is being removed completely, how can this be detected as watermarks would be ignored for window garbage collection. Or could we dynamically schedule a job of each sensor? Which would result in 1000 Jobs.
>> 
>> 
>> Thanks,
>> Stephan
>> 
>> 
>> 
> 
> 
> 
> 
> If you reply to this email, your message will be added to the discussion below:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10094.html <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10094.html>
> To unsubscribe from Maintaining watermarks per key, instead of per operator instance, click here <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=7288&code=c3RlcGhhbi5lcHBpbmdAendlaXRhZy5kZXw3Mjg4fC0yNzYyODY4NzI=>.
> NAML <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10098.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Maintaining watermarks per key, instead of per operator instance

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Stephan,

I'm skeptical about two things:
- using processing time will result in inaccurately bounded aggregates (or
do you want to group by event time in a processing time window?)
- writing to and reading from Cassandra might be expensive (not sure what
you mean by cheaper in the end) and it is not integrated with Flink's
checkpointing mechanism for fault-tolerance.

To me, the stateful FlatMapOperator looks like the best approach. There is
an upcoming feature for registering timers in user-functions, i.e., a
function is called after the timer exceeds. This could be helpful to
overcome the problem of closing the window without new data.

Best,
Fabian

2016-11-14 8:39 GMT+01:00 Stephan Epping <st...@zweitag.de>:

> Hello Fabian,
>
> Thank you very much. What is your opinion on the following solution:
>
> - Window data per time window, e.g. 15 minutes
> - using processing time as trigger, e.g. 15 minutes
> - which results in an aggregate over sensor values
> - then use cassandra to select the previous aggregate (as there can be
> multiple for the time window due to processing time)
> - then update the aggregate and put it into a cassandra sink again
>
> The cassandra select will be a bit slower than using an in memory/flink
> state, but will be cheaper in the end. Further, what does this have for
> consequences?
> For example, replaying events will be more difficult, right? Also, what
> about Snapshots? Will they work with the mentioned design?
>
> kind regards,
> Stephan
>
>
> On 11 Nov 2016, at 00:39, Fabian Hueske <fh...@gmail.com> wrote:
>
> Hi Stephan,
>
> I just wrote an answer to your SO question.
>
> Best, Fabian
>
> 2016-11-10 11:01 GMT+01:00 Stephan Epping <st...@zweitag.de>:
>
>> Hello,
>>
>> I found this question in the Nabble archive (
>> http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/Maintaining-watermarks-per-key-instead-of-per-
>> operator-instance-tp7288.html) but was unable/dont know how to reply.
>>
>> Here is my question regarding the mentioned thread:
>>
>> Hello,
>>
>> I have similar requirements (see StackOverflor http://stac
>> koverflow.com/questions/40465335/apache-flink-multiple-
>> window-aggregations-and-late-data). I am pretty new to flink, could you
>> elaborate on a possible solution? We can guarantee good ordering by
>> sensor_id, thus watermarking by key would be the only reasonable way for us
>> (*sensorData.keyBy('id').timeWindow(1.minute).sum('value')*), could I do
>> my own watermarking after*sensorData.keyBy('id').overwriteWatermarking()*...
>> per key? Or maybe using custom state plus a custom trigger? What happens if
>> a sensor dies or is being removed completely, how can this be detected as
>> watermarks would be ignored for window garbage collection. Or could we
>> dynamically schedule a job of each sensor? Which would result in 1000 Jobs.
>>
>>
>> Thanks,
>> Stephan
>>
>>
>>
>
>

Re: Maintaining watermarks per key, instead of per operator instance

Posted by Stephan Epping <st...@zweitag.de>.
Hello Fabian,

Thank you very much. What is your opinion on the following solution:

- Window data per time window, e.g. 15 minutes
- using processing time as trigger, e.g. 15 minutes
- which results in an aggregate over sensor values
- then use cassandra to select the previous aggregate (as there can be multiple for the time window due to processing time)
- then update the aggregate and put it into a cassandra sink again

The cassandra select will be a bit slower than using an in memory/flink state, but will be cheaper in the end. Further, what does this have for consequences?
For example, replaying events will be more difficult, right? Also, what about Snapshots? Will they work with the mentioned design?

kind regards,
Stephan


> On 11 Nov 2016, at 00:39, Fabian Hueske <fh...@gmail.com> wrote:
> 
> Hi Stephan,
> 
> I just wrote an answer to your SO question. 
> 
> Best, Fabian
> 
> 2016-11-10 11:01 GMT+01:00 Stephan Epping <stephan.epping@zweitag.de <ma...@zweitag.de>>:
> Hello,
> 
> I found this question in the Nabble archive (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html>) but was unable/dont know how to reply.
> 
> Here is my question regarding the mentioned thread:
> 
>> Hello, 
>> 
>> I have similar requirements (see StackOverflor http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data <http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data>). I am pretty new to flink, could you elaborate on a possible solution? We can guarantee good ordering by sensor_id, thus watermarking by key would be the only reasonable way for us (sensorData.keyBy('id').timeWindow(1.minute).sum('value')), could I do my own watermarking aftersensorData.keyBy('id').overwriteWatermarking()... per key? Or maybe using custom state plus a custom trigger? What happens if a sensor dies or is being removed completely, how can this be detected as watermarks would be ignored for window garbage collection. Or could we dynamically schedule a job of each sensor? Which would result in 1000 Jobs.
> 
> 
> Thanks,
> Stephan
> 
> 
> 


Re: Maintaining watermarks per key, instead of per operator instance

Posted by Aljoscha Krettek <al...@apache.org>.
I think you would need something like this:

var hourlyDiscarding = stream
  .window(1.hour)
  .trigger(discarding)
  .apply(..)

//write to cassandra
hourlyDiscarding
  .window(1.hour)
  .trigger(accumulating)
  .apply(..)
  .addSink(cassandra)

//forward to next acc step
var daily = hourlyDiscarding
  .window(1.day)
  .trigger(accumulating)
  .apply(…)

//write to cassandra
daily.addSink(cassandra)

The decision between accumulating/discarding happens at the point where the
window is defined, not downstream (this is the same as in Beam).


On Wed, 23 Nov 2016 at 11:37 kaelumania <st...@zweitag.de> wrote:

> Sounds good to me. But I still need to have some kind of side output
> (cassandra) that stores the accumulating aggregates on each time scale
> (minute, hour). Thus I would need to have something like this
>
> var hourly = stream.window(1.hour).apply(..)
> //write to cassandra
> hourly.trigger(accumulating).addSink(cassandra)
> //forward to next acc step
> var daily = hourly.trigger(discarding).window(1.day).apply(…)
> //write to cassandra
> daily.trigger(accumulating).addSink(cassandra)
>
> Would this be possible?
>
> best, Stephan
>
> On 23 Nov 2016, at 11:16, Aljoscha Krettek [via Apache Flink User Mailing
> List archive.] <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=10295&i=0>> wrote:
>
> You can implement discarding behaviour by writing a custom trigger (based
> on EventTimeTrigger) that returns FIRE_AND_PURGE when firing. With this you
> could maybe implement a cascade of windows where the first aggregates for
> the smallest time interval and is discarding and where the other triggers
> take these "pre-aggregated" values and accumulate.
>
> On Tue, 22 Nov 2016 at 08:11 Stephan Epping <<a
> href="x-msg://8/user/SendEmail.jtp?type=node&amp;node=10294&amp;i=0"
> target="_top" rel="nofollow" link="external" class="">[hidden email]> wrote:
>
> Hey Aljoscha,
>
> the first solution did not work out as expected. As when late elements
> arrive the first window is triggered again and would emit a new
> (accumulated) event, that would be counted twice (in time accumulation and
> late accumulation) in the second window.I could implement my own
> (discarding strategy) like in Apache Beam, but the out stream should
> contain accumulated events that are stored in cassandra. The second
> solution just gave an compiler error, thus I think is not possible right
> now.
>
> best Stephan
>
> On 21 Nov 2016, at 17:56, Aljoscha Krettek <<a
> href="x-msg://8/user/SendEmail.jtp?type=node&amp;node=10294&amp;i=1"
> target="_top" rel="nofollow" link="external" class="">[hidden email]> wrote:
>
> Hi,
> why did you settle for the last solution?
>
> Cheers,
> Aljoscha
>
> On Thu, 17 Nov 2016 at 15:57 kaelumania <<a
> href="x-msg://8/user/SendEmail.jtp?type=node&amp;node=10294&amp;i=2"
> target="_top" rel="nofollow" link="external" class="">[hidden email]> wrote:
>
> Hi Fabian,
>
> your proposed solution for:
>
>
>    1. Multiple window aggregations
>
> You can construct a data flow of cascading window operators and fork off
> (to emit or further processing) the result after each window.
>
> Input -> window(15 secs) -> window(1 min) -> window(15 min) -> ...
>                         \-> out_1        \-> out_2         \-> out_3
>
> does not work, am I missing something?
>
> First I tried the following
>
> DataStream<Reading> values = input.assignTimestampsAndWatermarks(new StrictWatermarkAssigner()); // force lateness
>
> DataStream<ReadingAggregate> aggregatesPerMinute = values
>         .keyBy("id")
>         .timeWindow(Time.minutes(1))
>         .allowedLateness(Time.minutes(2))
>         .apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
>
> DataStream<ReadingAggregate> aggregatesPerHour = aggregatesPerMinute
>         .keyBy("id")
>         .timeWindow(Time.hours(1))
>         .allowedLateness(Time.hours(2))
>         .apply(new AggregateReadingAggregates(), new AggregateReadingAggregates());
>
> but due to late data the first fold function would emit 2 rolling
> aggregates (one with and one without the late element), which results in
> being counted twice within the second reducer. Therefore i tried
>
> WindowedStream<Reading, Tuple, TimeWindow> readingsPerMinute = input
>         .assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // force lateness
>         .keyBy("id")
>         .timeWindow(Time.minutes(1))
>         .allowedLateness(Time.hours(2));
>
> WindowedStream<Reading, Tuple, TimeWindow> readingsPerHours = readingsPerMinute
>         .timeWindow(Time.hours(1))
>         .allowedLateness(Time.hours(2));
>
> DataStream<ReadingAggregate> aggregatesPerMinute = readingsPerMinute.apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
> DataStream<ReadingAggregate> aggregatesPerHour = readingsPerHours.apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
>
> which gives me a compiler error as WindowedStream does not provide a
> timeWindow method.
>
> Finally I settled with this:
>
> KeyedStream<Reading, Tuple> readings = input
>         .assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // force lateness
>         .keyBy("id");
>
> DataStream<ReadingAggregate> aggregatesPerMinute = readings
>         .timeWindow(Time.minutes(1))
>         .allowedLateness(Time.hours(2))
>         .apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
>
> DataStream<ReadingAggregate> aggregatesPerHour = readings
>         .timeWindow(Time.hours(1))
>         .allowedLateness(Time.hours(2))
>         .apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
>
>
>
> Feedback is very welcome.
>
> best, Stephan
>
>
>
> On 11 Nov 2016, at 00:29, Fabian Hueske-2 [via Apache Flink User Mailing
> List archive.] <[hidden email]
> <http://user/SendEmail.jtp?type=node&node=10179&i=0>> wrote:
>
> Hi Stephan,
>
> I just wrote an answer to your SO question.
>
> Best, Fabian
>
>
> 2016-11-10 11:01 GMT+01:00 Stephan Epping <<a href="
> x-msg://3/user/SendEmail.jtp?type=node&amp;node=10033&amp;i=0"
> target="_top" rel="nofollow" link="external" class="">[hidden email]>:
>
>
> Hello,
>
> I found this question in the Nabble archive (
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html)
> but was unable/dont know how to reply.
>
> Here is my question regarding the mentioned thread:
>
> Hello,
>
> I have similar requirements (see StackOverflor
> http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data).
> I am pretty new to flink, could you elaborate on a possible solution? We
> can guarantee good ordering by sensor_id, thus watermarking by key would be
> the only reasonable way for us (
> *sensorData.keyBy('id').timeWindow(1.minute).sum('value')*), could I do
> my own watermarking after*sensorData.keyBy('id').overwriteWatermarking()*...
> per key? Or maybe using custom state plus a custom trigger? What happens if
> a sensor dies or is being removed completely, how can this be detected as
> watermarks would be ignored for window garbage collection. Or could we
> dynamically schedule a job of each sensor? Which would result in 1000 Jobs.
>
>
> Thanks,
> Stephan
>
>
> If you reply to this email, your message will be added to the discussion
> below:
>
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10033.html
>
> To unsubscribe from Maintaining watermarks per key, instead of per
> operator instance, click here.
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>
>
> ------------------------------
> View this message in context: Re: Maintaining watermarks per key, instead
> of per operator instance
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10179.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com <http://nabble.com/>.
>
> If you reply to this email, your message will be added to the discussion
> below:
>
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10294.html
>
> To unsubscribe from Maintaining watermarks per key, instead of per
> operator instance, click here.
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>
>
> ------------------------------
> View this message in context: Re: Maintaining watermarks per key, instead
> of per operator instance
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10295.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com.
>

Re: Maintaining watermarks per key, instead of per operator instance

Posted by kaelumania <st...@zweitag.de>.
Sounds good to me. But I still need to have some kind of side output (cassandra) that stores the accumulating aggregates on each time scale (minute, hour). Thus I would need to have something like this

var hourly = stream.window(1.hour).apply(..)
//write to cassandra
hourly.trigger(accumulating).addSink(cassandra)
//forward to next acc step
var daily = hourly.trigger(discarding).window(1.day).apply(…)
//write to cassandra
daily.trigger(accumulating).addSink(cassandra)

Would this be possible?

best, Stephan
> On 23 Nov 2016, at 11:16, Aljoscha Krettek [via Apache Flink User Mailing List archive.] <ml...@n4.nabble.com> wrote:
> 
> You can implement discarding behaviour by writing a custom trigger (based on EventTimeTrigger) that returns FIRE_AND_PURGE when firing. With this you could maybe implement a cascade of windows where the first aggregates for the smallest time interval and is discarding and where the other triggers take these "pre-aggregated" values and accumulate.
> 
> On Tue, 22 Nov 2016 at 08:11 Stephan Epping <[hidden email] <x-msg://8/user/SendEmail.jtp?type=node&node=10294&i=0>> wrote:
> Hey Aljoscha,
> 
> the first solution did not work out as expected. As when late elements arrive the first window is triggered again and would emit a new (accumulated) event, that would be counted twice (in time accumulation and late accumulation) in the second window.I could implement my own (discarding strategy) like in Apache Beam, but the out stream should contain accumulated events that are stored in cassandra. The second solution just gave an compiler error, thus I think is not possible right now.
> 
> best Stephan
> 
> 
> 
>> On 21 Nov 2016, at 17:56, Aljoscha Krettek <[hidden email] <x-msg://8/user/SendEmail.jtp?type=node&node=10294&i=1>> wrote:
>> 
>> Hi,
>> why did you settle for the last solution?
>> 
>> Cheers,
>> Aljoscha
>> 
>> On Thu, 17 Nov 2016 at 15:57 kaelumania <[hidden email] <x-msg://8/user/SendEmail.jtp?type=node&node=10294&i=2>> wrote:
>> Hi Fabian,
>> 
>> your proposed solution for:
>>  
>> Multiple window aggregations
>> You can construct a data flow of cascading window operators and fork off (to emit or further processing) the result after each window.
>> 
>> Input -> window(15 secs) -> window(1 min) -> window(15 min) -> ...
>>                         \-> out_1        \-> out_2         \-> out_3
>> does not work, am I missing something?
>> 
>> First I tried the following
>> DataStream<Reading> values = input.assignTimestampsAndWatermarks(new StrictWatermarkAssigner()); // force lateness
>> 
>> DataStream<ReadingAggregate> aggregatesPerMinute = values
>>         .keyBy("id")
>>         .timeWindow(Time.minutes(1))
>>         .allowedLateness(Time.minutes(2))
>>         .apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
>> 
>> DataStream<ReadingAggregate> aggregatesPerHour = aggregatesPerMinute
>>         .keyBy("id")
>>         .timeWindow(Time.hours(1))
>>         .allowedLateness(Time.hours(2))
>>         .apply(new AggregateReadingAggregates(), new AggregateReadingAggregates());
>> but due to late data the first fold function would emit 2 rolling aggregates (one with and one without the late element), which results in being counted twice within the second reducer. Therefore i tried
>> WindowedStream<Reading, Tuple, TimeWindow> readingsPerMinute = input
>>         .assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // force lateness
>>         .keyBy("id")
>>         .timeWindow(Time.minutes(1))
>>         .allowedLateness(Time.hours(2));
>> 
>> WindowedStream<Reading, Tuple, TimeWindow> readingsPerHours = readingsPerMinute
>>         .timeWindow(Time.hours(1))
>>         .allowedLateness(Time.hours(2));
>> 
>> DataStream<ReadingAggregate> aggregatesPerMinute = readingsPerMinute.apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
>> DataStream<ReadingAggregate> aggregatesPerHour = readingsPerHours.apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
>> which gives me a compiler error as WindowedStream does not provide a timeWindow method.
>> 
>> Finally I settled with this:
>> KeyedStream<Reading, Tuple> readings = input
>>         .assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // force lateness
>>         .keyBy("id");
>> 
>> DataStream<ReadingAggregate> aggregatesPerMinute = readings
>>         .timeWindow(Time.minutes(1))
>>         .allowedLateness(Time.hours(2))
>>         .apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
>> 
>> DataStream<ReadingAggregate> aggregatesPerHour = readings
>>         .timeWindow(Time.hours(1))
>>         .allowedLateness(Time.hours(2))
>>         .apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
>> 
>> 
>> Feedback is very welcome.
>> 
>> best, Stephan
>> 
>> 
>> 
>> 
>>> On 11 Nov 2016, at 00:29, Fabian Hueske-2 [via Apache Flink User Mailing List archive.] <[hidden email] <http://user/SendEmail.jtp?type=node&node=10179&i=0>> wrote:
>>> 
>> 
>>> Hi Stephan,
>>> 
>>> I just wrote an answer to your SO question. 
>>> 
>>> Best, Fabian
>> 
>>> 
>>> 2016-11-10 11:01 GMT+01:00 Stephan Epping <<a href="x-msg://3/user/SendEmail.jtp?type=node&amp;node=10033&amp;i=0 <>" target="_top" rel="nofollow" link="external" class="">[hidden email]>:
>> 
>>> 
>>> Hello,
>>> 
>>> I found this question in the Nabble archive (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html>) but was unable/dont know how to reply.
>>> 
>>> Here is my question regarding the mentioned thread:
>>> 
>>>> Hello, 
>>>> 
>>>> I have similar requirements (see StackOverflor http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data <http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data>). I am pretty new to flink, could you elaborate on a possible solution? We can guarantee good ordering by sensor_id, thus watermarking by key would be the only reasonable way for us (sensorData.keyBy('id').timeWindow(1.minute).sum('value')), could I do my own watermarking aftersensorData.keyBy('id').overwriteWatermarking()... per key? Or maybe using custom state plus a custom trigger? What happens if a sensor dies or is being removed completely, how can this be detected as watermarks would be ignored for window garbage collection. Or could we dynamically schedule a job of each sensor? Which would result in 1000 Jobs.
>>> 
>>> 
>>> Thanks,
>>> Stephan
>>> 
>>> 
>> 
>>> If you reply to this email, your message will be added to the discussion below:
>> 
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10033.html <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10033.html>
>>> To unsubscribe from Maintaining watermarks per key, instead of per operator instance, click here <>.
>>> NAML <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>> View this message in context: Re: Maintaining watermarks per key, instead of per operator instance <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10179.html>
>> Sent from the Apache Flink User Mailing List archive. mailing list archive <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at Nabble.com <http://nabble.com/>.
> 
> 
> 
> If you reply to this email, your message will be added to the discussion below:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10294.html <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10294.html>
> To unsubscribe from Maintaining watermarks per key, instead of per operator instance, click here <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=7288&code=c3RlcGhhbi5lcHBpbmdAendlaXRhZy5kZXw3Mjg4fC0yNzYyODY4NzI=>.
> NAML <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10295.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Maintaining watermarks per key, instead of per operator instance

Posted by Aljoscha Krettek <al...@apache.org>.
You can implement discarding behaviour by writing a custom trigger (based
on EventTimeTrigger) that returns FIRE_AND_PURGE when firing. With this you
could maybe implement a cascade of windows where the first aggregates for
the smallest time interval and is discarding and where the other triggers
take these "pre-aggregated" values and accumulate.

On Tue, 22 Nov 2016 at 08:11 Stephan Epping <st...@zweitag.de>
wrote:

> Hey Aljoscha,
>
> the first solution did not work out as expected. As when late elements
> arrive the first window is triggered again and would emit a new
> (accumulated) event, that would be counted twice (in time accumulation and
> late accumulation) in the second window.I could implement my own
> (discarding strategy) like in Apache Beam, but the out stream should
> contain accumulated events that are stored in cassandra. The second
> solution just gave an compiler error, thus I think is not possible right
> now.
>
> best Stephan
>
>
>
> On 21 Nov 2016, at 17:56, Aljoscha Krettek <al...@apache.org> wrote:
>
> Hi,
> why did you settle for the last solution?
>
> Cheers,
> Aljoscha
>
> On Thu, 17 Nov 2016 at 15:57 kaelumania <st...@zweitag.de> wrote:
>
> Hi Fabian,
>
> your proposed solution for:
>
>
>    1. Multiple window aggregations
>
> You can construct a data flow of cascading window operators and fork off
> (to emit or further processing) the result after each window.
>
> Input -> window(15 secs) -> window(1 min) -> window(15 min) -> ...
>                         \-> out_1        \-> out_2         \-> out_3
>
> does not work, am I missing something?
>
> First I tried the following
>
> DataStream<Reading> values = input.assignTimestampsAndWatermarks(new StrictWatermarkAssigner()); // force lateness
>
> DataStream<ReadingAggregate> aggregatesPerMinute = values
>         .keyBy("id")
>         .timeWindow(Time.minutes(1))
>         .allowedLateness(Time.minutes(2))
>         .apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
>
> DataStream<ReadingAggregate> aggregatesPerHour = aggregatesPerMinute
>         .keyBy("id")
>         .timeWindow(Time.hours(1))
>         .allowedLateness(Time.hours(2))
>         .apply(new AggregateReadingAggregates(), new AggregateReadingAggregates());
>
> but due to late data the first fold function would emit 2 rolling
> aggregates (one with and one without the late element), which results in
> being counted twice within the second reducer. Therefore i tried
>
> WindowedStream<Reading, Tuple, TimeWindow> readingsPerMinute = input
>         .assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // force lateness
>         .keyBy("id")
>         .timeWindow(Time.minutes(1))
>         .allowedLateness(Time.hours(2));
>
> WindowedStream<Reading, Tuple, TimeWindow> readingsPerHours = readingsPerMinute
>         .timeWindow(Time.hours(1))
>         .allowedLateness(Time.hours(2));
>
> DataStream<ReadingAggregate> aggregatesPerMinute = readingsPerMinute.apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
> DataStream<ReadingAggregate> aggregatesPerHour = readingsPerHours.apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
>
> which gives me a compiler error as WindowedStream does not provide a
> timeWindow method.
>
> Finally I settled with this:
>
> KeyedStream<Reading, Tuple> readings = input
>         .assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // force lateness
>         .keyBy("id");
>
> DataStream<ReadingAggregate> aggregatesPerMinute = readings
>         .timeWindow(Time.minutes(1))
>         .allowedLateness(Time.hours(2))
>         .apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
>
> DataStream<ReadingAggregate> aggregatesPerHour = readings
>         .timeWindow(Time.hours(1))
>         .allowedLateness(Time.hours(2))
>         .apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
>
>
>
> Feedback is very welcome.
>
> best, Stephan
>
>
>
> On 11 Nov 2016, at 00:29, Fabian Hueske-2 [via Apache Flink User Mailing
> List archive.] <[hidden email]
> <http://user/SendEmail.jtp?type=node&node=10179&i=0>> wrote:
>
> Hi Stephan,
>
> I just wrote an answer to your SO question.
>
> Best, Fabian
>
>
> 2016-11-10 11:01 GMT+01:00 Stephan Epping <<a href="
> x-msg://3/user/SendEmail.jtp?type=node&amp;node=10033&amp;i=0"
> target="_top" rel="nofollow" link="external" class="">[hidden email]>:
>
>
> Hello,
>
> I found this question in the Nabble archive (
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html)
> but was unable/dont know how to reply.
>
> Here is my question regarding the mentioned thread:
>
> Hello,
>
> I have similar requirements (see StackOverflor
> http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data).
> I am pretty new to flink, could you elaborate on a possible solution? We
> can guarantee good ordering by sensor_id, thus watermarking by key would be
> the only reasonable way for us (
> *sensorData.keyBy('id').timeWindow(1.minute).sum('value')*), could I do
> my own watermarking after*sensorData.keyBy('id').overwriteWatermarking()*...
> per key? Or maybe using custom state plus a custom trigger? What happens if
> a sensor dies or is being removed completely, how can this be detected as
> watermarks would be ignored for window garbage collection. Or could we
> dynamically schedule a job of each sensor? Which would result in 1000 Jobs.
>
>
> Thanks,
> Stephan
>
>
> If you reply to this email, your message will be added to the discussion
> below:
>
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10033.html
>
> To unsubscribe from Maintaining watermarks per key, instead of per
> operator instance, click here.
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>
>
> ------------------------------
> View this message in context: Re: Maintaining watermarks per key, instead
> of per operator instance
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10179.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com.
>
>
>

Re: Maintaining watermarks per key, instead of per operator instance

Posted by Stephan Epping <st...@zweitag.de>.
Hey Aljoscha,

the first solution did not work out as expected. As when late elements arrive the first window is triggered again and would emit a new (accumulated) event, that would be counted twice (in time accumulation and late accumulation) in the second window.I could implement my own (discarding strategy) like in Apache Beam, but the out stream should contain accumulated events that are stored in cassandra. The second solution just gave an compiler error, thus I think is not possible right now.

best Stephan



> On 21 Nov 2016, at 17:56, Aljoscha Krettek <al...@apache.org> wrote:
> 
> Hi,
> why did you settle for the last solution?
> 
> Cheers,
> Aljoscha
> 
> On Thu, 17 Nov 2016 at 15:57 kaelumania <stephan.epping@zweitag.de <ma...@zweitag.de>> wrote:
> Hi Fabian,
> 
> your proposed solution for:
>  
> Multiple window aggregations
> You can construct a data flow of cascading window operators and fork off (to emit or further processing) the result after each window.
> 
> Input -> window(15 secs) -> window(1 min) -> window(15 min) -> ...
>                         \-> out_1        \-> out_2         \-> out_3
> does not work, am I missing something?
> 
> First I tried the following
> DataStream<Reading> values = input.assignTimestampsAndWatermarks(new StrictWatermarkAssigner()); // force lateness
> 
> DataStream<ReadingAggregate> aggregatesPerMinute = values
>         .keyBy("id")
>         .timeWindow(Time.minutes(1))
>         .allowedLateness(Time.minutes(2))
>         .apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
> 
> DataStream<ReadingAggregate> aggregatesPerHour = aggregatesPerMinute
>         .keyBy("id")
>         .timeWindow(Time.hours(1))
>         .allowedLateness(Time.hours(2))
>         .apply(new AggregateReadingAggregates(), new AggregateReadingAggregates());
> but due to late data the first fold function would emit 2 rolling aggregates (one with and one without the late element), which results in being counted twice within the second reducer. Therefore i tried
> WindowedStream<Reading, Tuple, TimeWindow> readingsPerMinute = input
>         .assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // force lateness
>         .keyBy("id")
>         .timeWindow(Time.minutes(1))
>         .allowedLateness(Time.hours(2));
> 
> WindowedStream<Reading, Tuple, TimeWindow> readingsPerHours = readingsPerMinute
>         .timeWindow(Time.hours(1))
>         .allowedLateness(Time.hours(2));
> 
> DataStream<ReadingAggregate> aggregatesPerMinute = readingsPerMinute.apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
> DataStream<ReadingAggregate> aggregatesPerHour = readingsPerHours.apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
> which gives me a compiler error as WindowedStream does not provide a timeWindow method.
> 
> Finally I settled with this:
> KeyedStream<Reading, Tuple> readings = input
>         .assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // force lateness
>         .keyBy("id");
> 
> DataStream<ReadingAggregate> aggregatesPerMinute = readings
>         .timeWindow(Time.minutes(1))
>         .allowedLateness(Time.hours(2))
>         .apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
> 
> DataStream<ReadingAggregate> aggregatesPerHour = readings
>         .timeWindow(Time.hours(1))
>         .allowedLateness(Time.hours(2))
>         .apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
> 
> 
> Feedback is very welcome.
> 
> best, Stephan
> 
> 
> 
> 
>> On 11 Nov 2016, at 00:29, Fabian Hueske-2 [via Apache Flink User Mailing List archive.] <[hidden email] <http://user/SendEmail.jtp?type=node&node=10179&i=0>> wrote:
>> 
> 
>> Hi Stephan,
>> 
>> I just wrote an answer to your SO question. 
>> 
>> Best, Fabian
> 
>> 
>> 2016-11-10 11:01 GMT+01:00 Stephan Epping <<a href="x-msg://3/user/SendEmail.jtp?type=node&amp;node=10033&amp;i=0" target="_top" rel="nofollow" link="external" class="">[hidden email]>:
> 
>> 
>> Hello,
>> 
>> I found this question in the Nabble archive (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html>) but was unable/dont know how to reply.
>> 
>> Here is my question regarding the mentioned thread:
>> 
>>> Hello, 
>>> 
>>> I have similar requirements (see StackOverflor http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data <http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data>). I am pretty new to flink, could you elaborate on a possible solution? We can guarantee good ordering by sensor_id, thus watermarking by key would be the only reasonable way for us (sensorData.keyBy('id').timeWindow(1.minute).sum('value')), could I do my own watermarking aftersensorData.keyBy('id').overwriteWatermarking()... per key? Or maybe using custom state plus a custom trigger? What happens if a sensor dies or is being removed completely, how can this be detected as watermarks would be ignored for window garbage collection. Or could we dynamically schedule a job of each sensor? Which would result in 1000 Jobs.
>> 
>> 
>> Thanks,
>> Stephan
>> 
>> 
> 
>> If you reply to this email, your message will be added to the discussion below:
> 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10033.html <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10033.html>
>> To unsubscribe from Maintaining watermarks per key, instead of per operator instance, click here <>.
>> NAML <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
> View this message in context: Re: Maintaining watermarks per key, instead of per operator instance <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10179.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at Nabble.com.


Re: Maintaining watermarks per key, instead of per operator instance

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
why did you settle for the last solution?

Cheers,
Aljoscha

On Thu, 17 Nov 2016 at 15:57 kaelumania <st...@zweitag.de> wrote:

> Hi Fabian,
>
> your proposed solution for:
>
>
>    1. Multiple window aggregations
>
> You can construct a data flow of cascading window operators and fork off
> (to emit or further processing) the result after each window.
>
> Input -> window(15 secs) -> window(1 min) -> window(15 min) -> ...
>                         \-> out_1        \-> out_2         \-> out_3
>
> does not work, am I missing something?
>
> First I tried the following
>
> DataStream<Reading> values = input.assignTimestampsAndWatermarks(new StrictWatermarkAssigner()); // force lateness
>
> DataStream<ReadingAggregate> aggregatesPerMinute = values
>         .keyBy("id")
>         .timeWindow(Time.minutes(1))
>         .allowedLateness(Time.minutes(2))
>         .apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
>
> DataStream<ReadingAggregate> aggregatesPerHour = aggregatesPerMinute
>         .keyBy("id")
>         .timeWindow(Time.hours(1))
>         .allowedLateness(Time.hours(2))
>         .apply(new AggregateReadingAggregates(), new AggregateReadingAggregates());
>
> but due to late data the first fold function would emit 2 rolling
> aggregates (one with and one without the late element), which results in
> being counted twice within the second reducer. Therefore i tried
>
> WindowedStream<Reading, Tuple, TimeWindow> readingsPerMinute = input
>         .assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // force lateness
>         .keyBy("id")
>         .timeWindow(Time.minutes(1))
>         .allowedLateness(Time.hours(2));
>
> WindowedStream<Reading, Tuple, TimeWindow> readingsPerHours = readingsPerMinute
>         .timeWindow(Time.hours(1))
>         .allowedLateness(Time.hours(2));
>
> DataStream<ReadingAggregate> aggregatesPerMinute = readingsPerMinute.apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
> DataStream<ReadingAggregate> aggregatesPerHour = readingsPerHours.apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
>
> which gives me a compiler error as WindowedStream does not provide a
> timeWindow method.
>
> Finally I settled with this:
>
> KeyedStream<Reading, Tuple> readings = input
>         .assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // force lateness
>         .keyBy("id");
>
> DataStream<ReadingAggregate> aggregatesPerMinute = readings
>         .timeWindow(Time.minutes(1))
>         .allowedLateness(Time.hours(2))
>         .apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
>
> DataStream<ReadingAggregate> aggregatesPerHour = readings
>         .timeWindow(Time.hours(1))
>         .allowedLateness(Time.hours(2))
>         .apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
>
>
>
> Feedback is very welcome.
>
> best, Stephan
>
>
>
> On 11 Nov 2016, at 00:29, Fabian Hueske-2 [via Apache Flink User Mailing
> List archive.] <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=10179&i=0>> wrote:
>
> Hi Stephan,
>
> I just wrote an answer to your SO question.
>
> Best, Fabian
>
>
> 2016-11-10 11:01 GMT+01:00 Stephan Epping <<a
> href="x-msg://3/user/SendEmail.jtp?type=node&amp;node=10033&amp;i=0"
> target="_top" rel="nofollow" link="external" class="">[hidden email]>:
>
>
> Hello,
>
> I found this question in the Nabble archive (
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html)
> but was unable/dont know how to reply.
>
> Here is my question regarding the mentioned thread:
>
> Hello,
>
> I have similar requirements (see StackOverflor
> http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data).
> I am pretty new to flink, could you elaborate on a possible solution? We
> can guarantee good ordering by sensor_id, thus watermarking by key would be
> the only reasonable way for us (
> *sensorData.keyBy('id').timeWindow(1.minute).sum('value')*), could I do
> my own watermarking after*sensorData.keyBy('id').overwriteWatermarking()*...
> per key? Or maybe using custom state plus a custom trigger? What happens if
> a sensor dies or is being removed completely, how can this be detected as
> watermarks would be ignored for window garbage collection. Or could we
> dynamically schedule a job of each sensor? Which would result in 1000 Jobs.
>
>
> Thanks,
> Stephan
>
>
> If you reply to this email, your message will be added to the discussion
> below:
>
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10033.html
>
> To unsubscribe from Maintaining watermarks per key, instead of per
> operator instance, click here.
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>
>
> ------------------------------
> View this message in context: Re: Maintaining watermarks per key, instead
> of per operator instance
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10179.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com.
>

Re: Maintaining watermarks per key, instead of per operator instance

Posted by kaelumania <st...@zweitag.de>.
Hi Fabian,

your proposed solution for:
 
Multiple window aggregations
You can construct a data flow of cascading window operators and fork off (to emit or further processing) the result after each window.

Input -> window(15 secs) -> window(1 min) -> window(15 min) -> ...
                        \-> out_1        \-> out_2         \-> out_3
does not work, am I missing something?

First I tried the following
DataStream<Reading> values = input.assignTimestampsAndWatermarks(new StrictWatermarkAssigner()); // force lateness

DataStream<ReadingAggregate> aggregatesPerMinute = values
        .keyBy("id")
        .timeWindow(Time.minutes(1))
        .allowedLateness(Time.minutes(2))
        .apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings());

DataStream<ReadingAggregate> aggregatesPerHour = aggregatesPerMinute
        .keyBy("id")
        .timeWindow(Time.hours(1))
        .allowedLateness(Time.hours(2))
        .apply(new AggregateReadingAggregates(), new AggregateReadingAggregates());
but due to late data the first fold function would emit 2 rolling aggregates (one with and one without the late element), which results in being counted twice within the second reducer. Therefore i tried
WindowedStream<Reading, Tuple, TimeWindow> readingsPerMinute = input
        .assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // force lateness
        .keyBy("id")
        .timeWindow(Time.minutes(1))
        .allowedLateness(Time.hours(2));

WindowedStream<Reading, Tuple, TimeWindow> readingsPerHours = readingsPerMinute
        .timeWindow(Time.hours(1))
        .allowedLateness(Time.hours(2));

DataStream<ReadingAggregate> aggregatesPerMinute = readingsPerMinute.apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
DataStream<ReadingAggregate> aggregatesPerHour = readingsPerHours.apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
which gives me a compiler error as WindowedStream does not provide a timeWindow method.

Finally I settled with this:
KeyedStream<Reading, Tuple> readings = input
        .assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // force lateness
        .keyBy("id");

DataStream<ReadingAggregate> aggregatesPerMinute = readings
        .timeWindow(Time.minutes(1))
        .allowedLateness(Time.hours(2))
        .apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings());

DataStream<ReadingAggregate> aggregatesPerHour = readings
        .timeWindow(Time.hours(1))
        .allowedLateness(Time.hours(2))
        .apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings());


Feedback is very welcome.

best, Stephan



> On 11 Nov 2016, at 00:29, Fabian Hueske-2 [via Apache Flink User Mailing List archive.] <ml...@n4.nabble.com> wrote:
> 
> Hi Stephan,
> 
> I just wrote an answer to your SO question. 
> 
> Best, Fabian
> 
> 2016-11-10 11:01 GMT+01:00 Stephan Epping <[hidden email] <x-msg://3/user/SendEmail.jtp?type=node&node=10033&i=0>>:
> Hello,
> 
> I found this question in the Nabble archive (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html>) but was unable/dont know how to reply.
> 
> Here is my question regarding the mentioned thread:
> 
>> Hello, 
>> 
>> I have similar requirements (see StackOverflor http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data <http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data>). I am pretty new to flink, could you elaborate on a possible solution? We can guarantee good ordering by sensor_id, thus watermarking by key would be the only reasonable way for us (sensorData.keyBy('id').timeWindow(1.minute).sum('value')), could I do my own watermarking aftersensorData.keyBy('id').overwriteWatermarking()... per key? Or maybe using custom state plus a custom trigger? What happens if a sensor dies or is being removed completely, how can this be detected as watermarks would be ignored for window garbage collection. Or could we dynamically schedule a job of each sensor? Which would result in 1000 Jobs.
> 
> 
> Thanks,
> Stephan
> 
> 
> 
> 
> 
> If you reply to this email, your message will be added to the discussion below:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10033.html <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10033.html>
> To unsubscribe from Maintaining watermarks per key, instead of per operator instance, click here <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=7288&code=c3RlcGhhbi5lcHBpbmdAendlaXRhZy5kZXw3Mjg4fC0yNzYyODY4NzI=>.
> NAML <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10179.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Maintaining watermarks per key, instead of per operator instance

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Stephan,

I just wrote an answer to your SO question.

Best, Fabian

2016-11-10 11:01 GMT+01:00 Stephan Epping <st...@zweitag.de>:

> Hello,
>
> I found this question in the Nabble archive (http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Maintaining-
> watermarks-per-key-instead-of-per-operator-instance-tp7288.html) but was
> unable/dont know how to reply.
>
> Here is my question regarding the mentioned thread:
>
> Hello,
>
> I have similar requirements (see StackOverflor http://
> stackoverflow.com/questions/40465335/apache-flink-
> multiple-window-aggregations-and-late-data). I am pretty new to flink,
> could you elaborate on a possible solution? We can guarantee good ordering
> by sensor_id, thus watermarking by key would be the only reasonable way for
> us (*sensorData.keyBy('id').timeWindow(1.minute).sum('value')*), could I
> do my own watermarking after
> *sensorData.keyBy('id').overwriteWatermarking()*... per key? Or maybe
> using custom state plus a custom trigger? What happens if a sensor dies or
> is being removed completely, how can this be detected as watermarks would
> be ignored for window garbage collection. Or could we dynamically schedule
> a job of each sensor? Which would result in 1000 Jobs.
>
>
> Thanks,
> Stephan
>
>
>