You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dhruv Kumar <ga...@gmail.com> on 2018/03/18 03:10:46 UTC

Custom Processing per window

Hi

I am a CS PhD student at UMN Twin Cities. I am trying to use Flink for implementing some very specific use-cases: (They may not seem relevant but I need to implement them or I at least need to know if it is possible to implement them in Flink)

Assumptions:
1. Data stream is of the form (key, value). We achieve this by the .key operation provided by Flink API.
2. By emitting a key, I mean sending/outputting its aggregated value to any data sink. 

1. For each Tumbling window in the Event Time space, for each key, I would like to aggregate its value until it crosses a particular threshold (same threshold for all the keys). As soon as the key’s aggregated value crosses this threshold, I would like to emit this key. At the end of every tumbling window, all the (key, value) aggregated pairs  would be emitted irrespective of whether they have crossed the threshold or not.

2. For each Tumbling window in the event time space, I would like to maintain a LRU cache which stores the keys along with their aggregated values and their latest arrival time. The least recently used (LRU) key would be the key whose latest arrival time is earlier than the latest arrival times of all the other keys present in the LRU cache. The LRU cache is of a limited size. So, it is possible that the number of unique keys in a particular window is greater than the size of LRU cache. Whenever any (key, value) pair arrives, if the key already exists, its aggregated value is updated with the value of the newly arrived value and its latest arrival time is updated with the current event time. If the key does not exist and there is some free slot in the LRU cache, it is added into the LRU. As soon as the LRU cache gets occupied fully and a new key comes in which does not exist in the LRU cache, we would like to emit the least recently used key to accommodate the newly arrived key. As in the case of 1, at the end of every tumbling window, all the (key, value) aggregated pairs in the LRU cache would be emitted.  

Would like to know how can we implement these algorithms using Flink. Any help would be greatly appreciated.

Dhruv Kumar
PhD Candidate
Department of Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me


Re: Custom Processing per window

Posted by Dhruv Kumar <ga...@gmail.com>.
Is there a way I can leverage OperatorState (instead of KeyState) to solve my issue?


> On Mar 19, 2018, at 09:00, Fabian Hueske <fh...@gmail.com> wrote:
> 
> Hi,
> 
> Data is partitioned by key across machines and state is kept per key. It is not possible to interact with two keys at the same time.
> 
> Best, Fabian
> 
> 2018-03-19 14:47 GMT+01:00 Dhruv Kumar <gargdhruv36@gmail.com <ma...@gmail.com>>:
> In other words, while using the Flink streaming APIs, is it possible to take a decision on emitting a particular key based on the state of some other key present in the same window?
> 
> Thanks!
> --------------------------------------------------
> Dhruv Kumar
> PhD Candidate
> Department of Computer Science and Engineering
> University of Minnesota
> www.dhruvkumar.me <http://www.dhruvkumar.me/>
> 
>> On Mar 19, 2018, at 05:11, Dhruv Kumar <gargdhruv36@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Task 1: I implemented it using a custom Trigger (see attached file). Looks like it is doing what I want it to. I copied the code from EventTimeTrigger.java and overwrote the onElement method. 
>> 
>> Task 2: I will need to maintain the state (this will be the LRU cache) for multiple keys in the same data structure. But it looks like that the Keyed states are on a per key basis. Should I use OperatorState in some way? Can I use a data structure not directly managed by Flink? What will happen in the case of keys across multiple machines?
>> 
>> <LazyAlgoTrigger.java>
>> 
>> 
>> Dhruv Kumar
>> PhD Candidate
>> Department of Computer Science and Engineering
>> University of Minnesota
>> www.dhruvkumar.me <http://www.dhruvkumar.me/>
>> 
>>> On Mar 19, 2018, at 02:04, Jörn Franke <jornfranke@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> How would you start implementing it? Where are you stuck?
>>> 
>>> Did you already try to implement this?
>>> 
>>> On 18. Mar 2018, at 04:10, Dhruv Kumar <gargdhruv36@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>>> Hi
>>>> 
>>>> I am a CS PhD student at UMN Twin Cities. I am trying to use Flink for implementing some very specific use-cases: (They may not seem relevant but I need to implement them or I at least need to know if it is possible to implement them in Flink)
>>>> 
>>>> Assumptions:
>>>> 1. Data stream is of the form (key, value). We achieve this by the .key operation provided by Flink API.
>>>> 2. By emitting a key, I mean sending/outputting its aggregated value to any data sink. 
>>>> 
>>>> 1. For each Tumbling window in the Event Time space, for each key, I would like to aggregate its value until it crosses a particular threshold (same threshold for all the keys). As soon as the key’s aggregated value crosses this threshold, I would like to emit this key. At the end of every tumbling window, all the (key, value) aggregated pairs  would be emitted irrespective of whether they have crossed the threshold or not.
>>>> 
>>>> 2. For each Tumbling window in the event time space, I would like to maintain a LRU cache which stores the keys along with their aggregated values and their latest arrival time. The least recently used (LRU) key would be the key whose latest arrival time is earlier than the latest arrival times of all the other keys present in the LRU cache. The LRU cache is of a limited size. So, it is possible that the number of unique keys in a particular window is greater than the size of LRU cache. Whenever any (key, value) pair arrives, if the key already exists, its aggregated value is updated with the value of the newly arrived value and its latest arrival time is updated with the current event time. If the key does not exist and there is some free slot in the LRU cache, it is added into the LRU. As soon as the LRU cache gets occupied fully and a new key comes in which does not exist in the LRU cache, we would like to emit the least recently used key to accommodate the newly arrived key. As in the case of 1, at the end of every tumbling window, all the (key, value) aggregated pairs in the LRU cache would be emitted.  
>>>> 
>>>> Would like to know how can we implement these algorithms using Flink. Any help would be greatly appreciated.
>>>> 
>>>> Dhruv Kumar
>>>> PhD Candidate
>>>> Department of Computer Science and Engineering
>>>> University of Minnesota
>>>> www.dhruvkumar.me <http://www.dhruvkumar.me/>
>> 
> 
> 


Re: Custom Processing per window

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

Data is partitioned by key across machines and state is kept per key. It is
not possible to interact with two keys at the same time.

Best, Fabian

2018-03-19 14:47 GMT+01:00 Dhruv Kumar <ga...@gmail.com>:

> In other words, while using the Flink streaming APIs, is it possible to
> take a decision on emitting a particular key based on the state of some
> other key present in the same window?
>
> Thanks!
> --------------------------------------------------
> *Dhruv Kumar*
> PhD Candidate
> Department of Computer Science and Engineering
> University of Minnesota
> www.dhruvkumar.me
>
> On Mar 19, 2018, at 05:11, Dhruv Kumar <ga...@gmail.com> wrote:
>
> Task 1: I implemented it using a custom Trigger (see attached file). Looks
> like it is doing what I want it to. I copied the code from
> EventTimeTrigger.java and overwrote the *onElement* method.
>
> Task 2: I will need to maintain the state (this will be the LRU cache) for
> multiple keys in the same data structure. But it looks like that the Keyed
> states are on a per key basis. Should I use OperatorState in some way? Can
> I use a data structure not directly managed by Flink? What will happen in
> the case of keys across multiple machines?
>
> <LazyAlgoTrigger.java>
>
>
> *Dhruv Kumar*
> PhD Candidate
> Department of Computer Science and Engineering
> University of Minnesota
> www.dhruvkumar.me
>
> On Mar 19, 2018, at 02:04, Jörn Franke <jo...@gmail.com> wrote:
>
> How would you start implementing it? Where are you stuck?
>
> Did you already try to implement this?
>
> On 18. Mar 2018, at 04:10, Dhruv Kumar <ga...@gmail.com> wrote:
>
> Hi
>
> I am a CS PhD student at UMN Twin Cities. I am trying to use Flink for
> implementing some very specific use-cases: (They may not seem relevant but
> I need to implement them or I at least need to know if it is possible to
> implement them in Flink)
>
> Assumptions:
> 1. Data stream is of the form (key, value). We achieve this by the *.key*
> operation provided by Flink API.
> 2. By emitting a key, I mean sending/outputting its aggregated value to
> any data sink.
>
> 1. For each Tumbling window in the Event Time space, for each key, I would
> like to aggregate its value until it crosses a particular threshold (same
> threshold for all the keys). As soon as the key’s aggregated value crosses
> this threshold, I would like to emit this key. At the end of every tumbling
> window, all the (key, value) aggregated pairs  would be emitted
> irrespective of whether they have crossed the threshold or not.
>
> 2. For each Tumbling window in the event time space, I would like to
> maintain a LRU cache which stores the keys along with their aggregated
> values and their latest arrival time. The least recently used (LRU) key
> would be the key whose latest arrival time is earlier than the latest
> arrival times of all the other keys present in the LRU cache. The LRU cache
> is of a limited size. So, it is possible that the number of unique keys in
> a particular window is greater than the size of LRU cache. Whenever any
> (key, value) pair arrives, if the key already exists, its aggregated value
> is updated with the value of the newly arrived value and its latest arrival
> time is updated with the current event time. If the key does not exist and
> there is some free slot in the LRU cache, it is added into the LRU. As soon
> as the LRU cache gets occupied fully and a new key comes in which does not
> exist in the LRU cache, we would like to emit the least recently used key
> to accommodate the newly arrived key. As in the case of 1, at the end of
> every tumbling window, all the (key, value) aggregated pairs in the LRU
> cache would be emitted.
>
> Would like to know how can we implement these algorithms using Flink. Any
> help would be greatly appreciated.
>
> *Dhruv Kumar*
> PhD Candidate
> Department of Computer Science and Engineering
> University of Minnesota
> www.dhruvkumar.me
>
>
>
>

Re: Custom Processing per window

Posted by Dhruv Kumar <ga...@gmail.com>.
In other words, while using the Flink streaming APIs, is it possible to take a decision on emitting a particular key based on the state of some other key present in the same window?

Thanks!
--------------------------------------------------
Dhruv Kumar
PhD Candidate
Department of Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me

> On Mar 19, 2018, at 05:11, Dhruv Kumar <ga...@gmail.com> wrote:
> 
> Task 1: I implemented it using a custom Trigger (see attached file). Looks like it is doing what I want it to. I copied the code from EventTimeTrigger.java and overwrote the onElement method. 
> 
> Task 2: I will need to maintain the state (this will be the LRU cache) for multiple keys in the same data structure. But it looks like that the Keyed states are on a per key basis. Should I use OperatorState in some way? Can I use a data structure not directly managed by Flink? What will happen in the case of keys across multiple machines?
> 
> <LazyAlgoTrigger.java>
> 
> 
> Dhruv Kumar
> PhD Candidate
> Department of Computer Science and Engineering
> University of Minnesota
> www.dhruvkumar.me <http://www.dhruvkumar.me/>
> 
>> On Mar 19, 2018, at 02:04, Jörn Franke <jornfranke@gmail.com <ma...@gmail.com>> wrote:
>> 
>> How would you start implementing it? Where are you stuck?
>> 
>> Did you already try to implement this?
>> 
>> On 18. Mar 2018, at 04:10, Dhruv Kumar <gargdhruv36@gmail.com <ma...@gmail.com>> wrote:
>> 
>>> Hi
>>> 
>>> I am a CS PhD student at UMN Twin Cities. I am trying to use Flink for implementing some very specific use-cases: (They may not seem relevant but I need to implement them or I at least need to know if it is possible to implement them in Flink)
>>> 
>>> Assumptions:
>>> 1. Data stream is of the form (key, value). We achieve this by the .key operation provided by Flink API.
>>> 2. By emitting a key, I mean sending/outputting its aggregated value to any data sink. 
>>> 
>>> 1. For each Tumbling window in the Event Time space, for each key, I would like to aggregate its value until it crosses a particular threshold (same threshold for all the keys). As soon as the key’s aggregated value crosses this threshold, I would like to emit this key. At the end of every tumbling window, all the (key, value) aggregated pairs  would be emitted irrespective of whether they have crossed the threshold or not.
>>> 
>>> 2. For each Tumbling window in the event time space, I would like to maintain a LRU cache which stores the keys along with their aggregated values and their latest arrival time. The least recently used (LRU) key would be the key whose latest arrival time is earlier than the latest arrival times of all the other keys present in the LRU cache. The LRU cache is of a limited size. So, it is possible that the number of unique keys in a particular window is greater than the size of LRU cache. Whenever any (key, value) pair arrives, if the key already exists, its aggregated value is updated with the value of the newly arrived value and its latest arrival time is updated with the current event time. If the key does not exist and there is some free slot in the LRU cache, it is added into the LRU. As soon as the LRU cache gets occupied fully and a new key comes in which does not exist in the LRU cache, we would like to emit the least recently used key to accommodate the newly arrived key. As in the case of 1, at the end of every tumbling window, all the (key, value) aggregated pairs in the LRU cache would be emitted.  
>>> 
>>> Would like to know how can we implement these algorithms using Flink. Any help would be greatly appreciated.
>>> 
>>> Dhruv Kumar
>>> PhD Candidate
>>> Department of Computer Science and Engineering
>>> University of Minnesota
>>> www.dhruvkumar.me <http://www.dhruvkumar.me/>
> 


Re: Custom Processing per window

Posted by Dhruv Kumar <ga...@gmail.com>.
Task 1: I implemented it using a custom Trigger (see attached file). Looks like it is doing what I want it to. I copied the code from EventTimeTrigger.java and overwrote the onElement method. 

Task 2: I will need to maintain the state (this will be the LRU cache) for multiple keys in the same data structure. But it looks like that the Keyed states are on a per key basis. Should I use OperatorState in some way? Can I use a data structure not directly managed by Flink? What will happen in the case of keys across multiple machines?




Dhruv Kumar
PhD Candidate
Department of Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me

> On Mar 19, 2018, at 02:04, Jörn Franke <jo...@gmail.com> wrote:
> 
> How would you start implementing it? Where are you stuck?
> 
> Did you already try to implement this?
> 
> On 18. Mar 2018, at 04:10, Dhruv Kumar <gargdhruv36@gmail.com <ma...@gmail.com>> wrote:
> 
>> Hi
>> 
>> I am a CS PhD student at UMN Twin Cities. I am trying to use Flink for implementing some very specific use-cases: (They may not seem relevant but I need to implement them or I at least need to know if it is possible to implement them in Flink)
>> 
>> Assumptions:
>> 1. Data stream is of the form (key, value). We achieve this by the .key operation provided by Flink API.
>> 2. By emitting a key, I mean sending/outputting its aggregated value to any data sink. 
>> 
>> 1. For each Tumbling window in the Event Time space, for each key, I would like to aggregate its value until it crosses a particular threshold (same threshold for all the keys). As soon as the key’s aggregated value crosses this threshold, I would like to emit this key. At the end of every tumbling window, all the (key, value) aggregated pairs  would be emitted irrespective of whether they have crossed the threshold or not.
>> 
>> 2. For each Tumbling window in the event time space, I would like to maintain a LRU cache which stores the keys along with their aggregated values and their latest arrival time. The least recently used (LRU) key would be the key whose latest arrival time is earlier than the latest arrival times of all the other keys present in the LRU cache. The LRU cache is of a limited size. So, it is possible that the number of unique keys in a particular window is greater than the size of LRU cache. Whenever any (key, value) pair arrives, if the key already exists, its aggregated value is updated with the value of the newly arrived value and its latest arrival time is updated with the current event time. If the key does not exist and there is some free slot in the LRU cache, it is added into the LRU. As soon as the LRU cache gets occupied fully and a new key comes in which does not exist in the LRU cache, we would like to emit the least recently used key to accommodate the newly arrived key. As in the case of 1, at the end of every tumbling window, all the (key, value) aggregated pairs in the LRU cache would be emitted.  
>> 
>> Would like to know how can we implement these algorithms using Flink. Any help would be greatly appreciated.
>> 
>> Dhruv Kumar
>> PhD Candidate
>> Department of Computer Science and Engineering
>> University of Minnesota
>> www.dhruvkumar.me <http://www.dhruvkumar.me/>


Re: Custom Processing per window

Posted by Jörn Franke <jo...@gmail.com>.
How would you start implementing it? Where are you stuck?

Did you already try to implement this?

> On 18. Mar 2018, at 04:10, Dhruv Kumar <ga...@gmail.com> wrote:
> 
> Hi
> 
> I am a CS PhD student at UMN Twin Cities. I am trying to use Flink for implementing some very specific use-cases: (They may not seem relevant but I need to implement them or I at least need to know if it is possible to implement them in Flink)
> 
> Assumptions:
> 1. Data stream is of the form (key, value). We achieve this by the .key operation provided by Flink API.
> 2. By emitting a key, I mean sending/outputting its aggregated value to any data sink. 
> 
> 1. For each Tumbling window in the Event Time space, for each key, I would like to aggregate its value until it crosses a particular threshold (same threshold for all the keys). As soon as the key’s aggregated value crosses this threshold, I would like to emit this key. At the end of every tumbling window, all the (key, value) aggregated pairs  would be emitted irrespective of whether they have crossed the threshold or not.
> 
> 2. For each Tumbling window in the event time space, I would like to maintain a LRU cache which stores the keys along with their aggregated values and their latest arrival time. The least recently used (LRU) key would be the key whose latest arrival time is earlier than the latest arrival times of all the other keys present in the LRU cache. The LRU cache is of a limited size. So, it is possible that the number of unique keys in a particular window is greater than the size of LRU cache. Whenever any (key, value) pair arrives, if the key already exists, its aggregated value is updated with the value of the newly arrived value and its latest arrival time is updated with the current event time. If the key does not exist and there is some free slot in the LRU cache, it is added into the LRU. As soon as the LRU cache gets occupied fully and a new key comes in which does not exist in the LRU cache, we would like to emit the least recently used key to accommodate the newly arrived key. As in the case of 1, at the end of every tumbling window, all the (key, value) aggregated pairs in the LRU cache would be emitted.  
> 
> Would like to know how can we implement these algorithms using Flink. Any help would be greatly appreciated.
> 
> Dhruv Kumar
> PhD Candidate
> Department of Computer Science and Engineering
> University of Minnesota
> www.dhruvkumar.me
>