You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Stephen Connolly <st...@gmail.com> on 2019/02/08 12:21:46 UTC

Reduce one event under multiple keys

Ok, I'll try and map my problem into something that should be familiar to
most people.

Consider collection of PCs, each of which has a unique ID, e.g.
ca:fe:ba:be, de:ad:be:ef, etc.

Each PC has a tree of local files. Some of the file paths are
coincidentally the same names, but there is no file sharing between PCs.

I need to produce metrics about how often files are opened and how long
they are open for.

I need for every X minute tumbling window not just the cumulative averages
for each PC, but the averages for each file as well as the cumulative
averegaes for each folder and their sub-folders.

I have a stream of events like

{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}{"source":"de:ad:be:ef","action":"open","path":"/foo/manchu/README.txt"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User
guide.txt"}{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin
guide.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/User
guide.txt","duration":"97"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/Admin
guide.txt","duration":"196"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/manchu/README.txt"}
{"source":"de:ad:be:ef","action":"open","path":"/bar/foo/README.txt"}

So from that I would like to know stuff like:

ca:fe:ba:be had 4/X opens per minute in the X minute window
ca:fe:ba:be had 3/X closes per minute in the X minute window and the
average time open was (67+97+197)/3=120... there is no guarantee that the
closes will be matched with opens in the same window, which is why I'm only
tracking them separately
de:ad:be:ef had 2/X opens per minute in the X minute window
ca:fe:ba:be /foo had 4/X opens per minute in the X minute window
ca:fe:ba:be /foo had 3/X closes per minute in the X minute window and the
average time open was 120
de:ad:be:ef /foo had 1/X opens per minute in the X minute window
de:ad:be:ef /bar had 1/X opens per minute in the X minute window
de:ad:be:ef /foo/manchu had 1/X opens per minute in the X minute window
de:ad:be:ef /bar/foo had 1/X opens per minute in the X minute window
de:ad:be:ef /foo/manchu/README.txt had 1/X opens per minute in the X minute
window
de:ad:be:ef /bar/foo/README.txt had 1/X opens per minute in the X minute
window
etc

What I think I want to do is turn each event into a series of events with
different keys, so that

{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}

gets sent under the keys:

("ca:fe:ba:be","/")
("ca:fe:ba:be","/foo")
("ca:fe:ba:be","/foo/bar")
("ca:fe:ba:be","/foo/bar/README.txt")

Then I could use a window aggregation function to just:

* count the "open" events
* count the "close" events and sum their duration

Additionally, I am (naïevely) hoping that if a window has no events for a
particular key, the memory/storage costs are zero for that key.

From what I can see, to achieve what I am trying to do, I could use a
flatMap followed by a keyBy

In other words I take the events and flat map them based on the path split
on '/' returning a Tuple of the (to be) key and the event. Then I can use
keyBy to key based on the Tuple 0.

My ask:

Is the above design a good design? How would you achieve the end game
better? Do I need to worry about many paths that are accessed rarely and
would have an accumulator function that stays at 0 unless there are events
in that window... or are the accumulators for each distinct key eagerly
purged after each fire trigger.

What gotcha's do I need to look for.

Thanks in advance and appologies for the length

-stephenc

Re: Reduce one event under multiple keys

Posted by Stephen Connolly <st...@gmail.com>.
Thanks!

On Mon, 18 Feb 2019 at 12:36, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Stephen,
>
> Sorry for the late response.
> If you don't need to match open and close events, your approach of using a
> flatMap to fan-out for the hierarchical folder structure and a window
> operator (or two for open and close) for counting and aggregating should be
> a good design.
>
> Best, Fabian
>
> Am Mo., 11. Feb. 2019 um 11:29 Uhr schrieb Stephen Connolly <
> stephen.alan.connolly@gmail.com>:
>
>>
>>
>> On Mon, 11 Feb 2019 at 09:42, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Hi Stephen,
>>>
>>> A window is created with the first record that is assigned to it.
>>> If the windows are based on time and a key, than no window will be
>>> created (and not space be occupied) if there is not a first record for a
>>> key and time interval.
>>>
>>> Anyway, if tracking the number of open files & average opening time is
>>> your use case, you might want to implement the logic with a ProcessFunction
>>> instead of a window.
>>> The reason is that it is that time windows don't share state, i.e., the
>>> information about an opened but not yet closed file would not be "carried
>>> over" to the next window.
>>> However, if you use a ProcessFunction, you are responsible for cleaning
>>> up the state.
>>>
>>
>> Ahh but I am cheating by ensuring the events are rich enough that I do
>> not need to match them.
>>
>> I get the "open" (they are not really "open" events - I have mapped to an
>> analogy... it might be more like a build job start events... or not... I'm
>> not at liberty to say ;-) ) events because I need to count the number of
>> "open"s per time period.
>>
>> I get the "close" events and they include the duration plus other
>> information that can then be transformed into the required metrics... yes I
>> could derive the "open" from the "close" by subtracting the duration but:
>>
>> 1. they would cross window boundaries quite often, leading to repeated
>> fetch-update-write operations on the backing data store
>> 2. they wouldn't be as "live" and one of the things we need to know is
>> how many "open"s there are in the previous window... given some durations
>> can be many days, waiting for the "close" event to create the "open" metric
>> would not be a good plan.
>>
>> Basically, I am pushing some of the calculations to the edge where there
>> is state that makes those calculations cheap and then the rich events are
>> *hopefully* easy to aggregate with just simple aggregation functions that
>> only need to maintain the running total... at least that's what the PoC I
>> am experimenting with Flink should show
>>
>>
>>>
>>> Hope this helps,
>>> Fabian
>>>
>>> Am So., 10. Feb. 2019 um 20:36 Uhr schrieb Stephen Connolly <
>>> stephen.alan.connolly@gmail.com>:
>>>
>>>>
>>>>
>>>> On Sun, 10 Feb 2019 at 09:09, Chesnay Schepler <ch...@apache.org>
>>>> wrote:
>>>>
>>>>> This sounds reasonable to me.
>>>>>
>>>>> I'm a bit confused by this question: "*Additionally, I am (naïevely)
>>>>> hoping that if a window has no events for a particular key, the
>>>>> memory/storage costs are zero for that key.*"
>>>>>
>>>>> Are you asking whether a key that was received in window X (as part of
>>>>> an event) is still present in window x+1? If so, then the answer is no; a
>>>>> key will only be present in a given window if an event was received that
>>>>> fits into that window.
>>>>>
>>>>
>>>> To confirm:
>>>>
>>>> So let's say I'l tracking the average time a file is opened in folders.
>>>>
>>>> In window N we get the events:
>>>>
>>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}
>>>>
>>>> {"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}
>>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User
>>>> guide.txt"}
>>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin
>>>> guide.txt"}
>>>>
>>>> So there will be aggregates stored for
>>>> ("ca:fe:ba:be","/"), ("ca:fe:ba:be","/foo"), ("ca:fe:ba:be","/foo/bar"),
>>>> ("ca:fe:ba:be","/foo/bar/README.txt"), etc
>>>>
>>>> In window N+1 we do not get any events at all.
>>>>
>>>> So the memory used by my aggregation functions from window N will be
>>>> freed and the storage will be effectively zero (modulo any follow on
>>>> processing that might be on a longer window)
>>>>
>>>> This seems to be what you are saying... in which case my naïeve hope
>>>> was not so naïve! w00t!
>>>>
>>>>
>>>>>
>>>>> On 08.02.2019 13:21, Stephen Connolly wrote:
>>>>>
>>>>> Ok, I'll try and map my problem into something that should be familiar
>>>>> to most people.
>>>>>
>>>>> Consider collection of PCs, each of which has a unique ID, e.g.
>>>>> ca:fe:ba:be, de:ad:be:ef, etc.
>>>>>
>>>>> Each PC has a tree of local files. Some of the file paths are
>>>>> coincidentally the same names, but there is no file sharing between PCs.
>>>>>
>>>>> I need to produce metrics about how often files are opened and how
>>>>> long they are open for.
>>>>>
>>>>> I need for every X minute tumbling window not just the cumulative
>>>>> averages for each PC, but the averages for each file as well as the
>>>>> cumulative averegaes for each folder and their sub-folders.
>>>>>
>>>>> I have a stream of events like
>>>>>
>>>>>
>>>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}{"source":"de:ad:be:ef","action":"open","path":"/foo/manchu/README.txt"}
>>>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User
>>>>> guide.txt"}{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin
>>>>> guide.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/User
>>>>> guide.txt","duration":"97"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/Admin
>>>>> guide.txt","duration":"196"}
>>>>>
>>>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/manchu/README.txt"}
>>>>> {"source":"de:ad:be:ef","action":"open","path":"/bar/foo/README.txt"}
>>>>>
>>>>> So from that I would like to know stuff like:
>>>>>
>>>>> ca:fe:ba:be had 4/X opens per minute in the X minute window
>>>>> ca:fe:ba:be had 3/X closes per minute in the X minute window and the
>>>>> average time open was (67+97+197)/3=120... there is no guarantee that the
>>>>> closes will be matched with opens in the same window, which is why I'm only
>>>>> tracking them separately
>>>>> de:ad:be:ef had 2/X opens per minute in the X minute window
>>>>> ca:fe:ba:be /foo had 4/X opens per minute in the X minute window
>>>>> ca:fe:ba:be /foo had 3/X closes per minute in the X minute window and
>>>>> the average time open was 120
>>>>> de:ad:be:ef /foo had 1/X opens per minute in the X minute window
>>>>> de:ad:be:ef /bar had 1/X opens per minute in the X minute window
>>>>> de:ad:be:ef /foo/manchu had 1/X opens per minute in the X minute window
>>>>> de:ad:be:ef /bar/foo had 1/X opens per minute in the X minute window
>>>>> de:ad:be:ef /foo/manchu/README.txt had 1/X opens per minute in the X
>>>>> minute window
>>>>> de:ad:be:ef /bar/foo/README.txt had 1/X opens per minute in the X
>>>>> minute window
>>>>> etc
>>>>>
>>>>> What I think I want to do is turn each event into a series of events
>>>>> with different keys, so that
>>>>>
>>>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}
>>>>>
>>>>> gets sent under the keys:
>>>>>
>>>>> ("ca:fe:ba:be","/")
>>>>> ("ca:fe:ba:be","/foo")
>>>>> ("ca:fe:ba:be","/foo/bar")
>>>>> ("ca:fe:ba:be","/foo/bar/README.txt")
>>>>>
>>>>> Then I could use a window aggregation function to just:
>>>>>
>>>>> * count the "open" events
>>>>> * count the "close" events and sum their duration
>>>>>
>>>>> Additionally, I am (naïevely) hoping that if a window has no events
>>>>> for a particular key, the memory/storage costs are zero for that key.
>>>>>
>>>>> From what I can see, to achieve what I am trying to do, I could use a
>>>>> flatMap followed by a keyBy
>>>>>
>>>>> In other words I take the events and flat map them based on the path
>>>>> split on '/' returning a Tuple of the (to be) key and the event. Then I can
>>>>> use keyBy to key based on the Tuple 0.
>>>>>
>>>>> My ask:
>>>>>
>>>>> Is the above design a good design? How would you achieve the end game
>>>>> better? Do I need to worry about many paths that are accessed rarely and
>>>>> would have an accumulator function that stays at 0 unless there are events
>>>>> in that window... or are the accumulators for each distinct key eagerly
>>>>> purged after each fire trigger.
>>>>>
>>>>> What gotcha's do I need to look for.
>>>>>
>>>>> Thanks in advance and appologies for the length
>>>>>
>>>>> -stephenc
>>>>>
>>>>>
>>>>>

Re: Reduce one event under multiple keys

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Stephen,

Sorry for the late response.
If you don't need to match open and close events, your approach of using a
flatMap to fan-out for the hierarchical folder structure and a window
operator (or two for open and close) for counting and aggregating should be
a good design.

Best, Fabian

Am Mo., 11. Feb. 2019 um 11:29 Uhr schrieb Stephen Connolly <
stephen.alan.connolly@gmail.com>:

>
>
> On Mon, 11 Feb 2019 at 09:42, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Stephen,
>>
>> A window is created with the first record that is assigned to it.
>> If the windows are based on time and a key, than no window will be
>> created (and not space be occupied) if there is not a first record for a
>> key and time interval.
>>
>> Anyway, if tracking the number of open files & average opening time is
>> your use case, you might want to implement the logic with a ProcessFunction
>> instead of a window.
>> The reason is that it is that time windows don't share state, i.e., the
>> information about an opened but not yet closed file would not be "carried
>> over" to the next window.
>> However, if you use a ProcessFunction, you are responsible for cleaning
>> up the state.
>>
>
> Ahh but I am cheating by ensuring the events are rich enough that I do not
> need to match them.
>
> I get the "open" (they are not really "open" events - I have mapped to an
> analogy... it might be more like a build job start events... or not... I'm
> not at liberty to say ;-) ) events because I need to count the number of
> "open"s per time period.
>
> I get the "close" events and they include the duration plus other
> information that can then be transformed into the required metrics... yes I
> could derive the "open" from the "close" by subtracting the duration but:
>
> 1. they would cross window boundaries quite often, leading to repeated
> fetch-update-write operations on the backing data store
> 2. they wouldn't be as "live" and one of the things we need to know is how
> many "open"s there are in the previous window... given some durations can
> be many days, waiting for the "close" event to create the "open" metric
> would not be a good plan.
>
> Basically, I am pushing some of the calculations to the edge where there
> is state that makes those calculations cheap and then the rich events are
> *hopefully* easy to aggregate with just simple aggregation functions that
> only need to maintain the running total... at least that's what the PoC I
> am experimenting with Flink should show
>
>
>>
>> Hope this helps,
>> Fabian
>>
>> Am So., 10. Feb. 2019 um 20:36 Uhr schrieb Stephen Connolly <
>> stephen.alan.connolly@gmail.com>:
>>
>>>
>>>
>>> On Sun, 10 Feb 2019 at 09:09, Chesnay Schepler <ch...@apache.org>
>>> wrote:
>>>
>>>> This sounds reasonable to me.
>>>>
>>>> I'm a bit confused by this question: "*Additionally, I am (naïevely)
>>>> hoping that if a window has no events for a particular key, the
>>>> memory/storage costs are zero for that key.*"
>>>>
>>>> Are you asking whether a key that was received in window X (as part of
>>>> an event) is still present in window x+1? If so, then the answer is no; a
>>>> key will only be present in a given window if an event was received that
>>>> fits into that window.
>>>>
>>>
>>> To confirm:
>>>
>>> So let's say I'l tracking the average time a file is opened in folders.
>>>
>>> In window N we get the events:
>>>
>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}
>>>
>>> {"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}
>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User guide.txt"}
>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin
>>> guide.txt"}
>>>
>>> So there will be aggregates stored for
>>> ("ca:fe:ba:be","/"), ("ca:fe:ba:be","/foo"), ("ca:fe:ba:be","/foo/bar"),
>>> ("ca:fe:ba:be","/foo/bar/README.txt"), etc
>>>
>>> In window N+1 we do not get any events at all.
>>>
>>> So the memory used by my aggregation functions from window N will be
>>> freed and the storage will be effectively zero (modulo any follow on
>>> processing that might be on a longer window)
>>>
>>> This seems to be what you are saying... in which case my naïeve hope was
>>> not so naïve! w00t!
>>>
>>>
>>>>
>>>> On 08.02.2019 13:21, Stephen Connolly wrote:
>>>>
>>>> Ok, I'll try and map my problem into something that should be familiar
>>>> to most people.
>>>>
>>>> Consider collection of PCs, each of which has a unique ID, e.g.
>>>> ca:fe:ba:be, de:ad:be:ef, etc.
>>>>
>>>> Each PC has a tree of local files. Some of the file paths are
>>>> coincidentally the same names, but there is no file sharing between PCs.
>>>>
>>>> I need to produce metrics about how often files are opened and how long
>>>> they are open for.
>>>>
>>>> I need for every X minute tumbling window not just the cumulative
>>>> averages for each PC, but the averages for each file as well as the
>>>> cumulative averegaes for each folder and their sub-folders.
>>>>
>>>> I have a stream of events like
>>>>
>>>>
>>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}{"source":"de:ad:be:ef","action":"open","path":"/foo/manchu/README.txt"}
>>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User
>>>> guide.txt"}{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin
>>>> guide.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/User
>>>> guide.txt","duration":"97"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/Admin
>>>> guide.txt","duration":"196"}
>>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/manchu/README.txt"}
>>>> {"source":"de:ad:be:ef","action":"open","path":"/bar/foo/README.txt"}
>>>>
>>>> So from that I would like to know stuff like:
>>>>
>>>> ca:fe:ba:be had 4/X opens per minute in the X minute window
>>>> ca:fe:ba:be had 3/X closes per minute in the X minute window and the
>>>> average time open was (67+97+197)/3=120... there is no guarantee that the
>>>> closes will be matched with opens in the same window, which is why I'm only
>>>> tracking them separately
>>>> de:ad:be:ef had 2/X opens per minute in the X minute window
>>>> ca:fe:ba:be /foo had 4/X opens per minute in the X minute window
>>>> ca:fe:ba:be /foo had 3/X closes per minute in the X minute window and
>>>> the average time open was 120
>>>> de:ad:be:ef /foo had 1/X opens per minute in the X minute window
>>>> de:ad:be:ef /bar had 1/X opens per minute in the X minute window
>>>> de:ad:be:ef /foo/manchu had 1/X opens per minute in the X minute window
>>>> de:ad:be:ef /bar/foo had 1/X opens per minute in the X minute window
>>>> de:ad:be:ef /foo/manchu/README.txt had 1/X opens per minute in the X
>>>> minute window
>>>> de:ad:be:ef /bar/foo/README.txt had 1/X opens per minute in the X
>>>> minute window
>>>> etc
>>>>
>>>> What I think I want to do is turn each event into a series of events
>>>> with different keys, so that
>>>>
>>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}
>>>>
>>>> gets sent under the keys:
>>>>
>>>> ("ca:fe:ba:be","/")
>>>> ("ca:fe:ba:be","/foo")
>>>> ("ca:fe:ba:be","/foo/bar")
>>>> ("ca:fe:ba:be","/foo/bar/README.txt")
>>>>
>>>> Then I could use a window aggregation function to just:
>>>>
>>>> * count the "open" events
>>>> * count the "close" events and sum their duration
>>>>
>>>> Additionally, I am (naïevely) hoping that if a window has no events for
>>>> a particular key, the memory/storage costs are zero for that key.
>>>>
>>>> From what I can see, to achieve what I am trying to do, I could use a
>>>> flatMap followed by a keyBy
>>>>
>>>> In other words I take the events and flat map them based on the path
>>>> split on '/' returning a Tuple of the (to be) key and the event. Then I can
>>>> use keyBy to key based on the Tuple 0.
>>>>
>>>> My ask:
>>>>
>>>> Is the above design a good design? How would you achieve the end game
>>>> better? Do I need to worry about many paths that are accessed rarely and
>>>> would have an accumulator function that stays at 0 unless there are events
>>>> in that window... or are the accumulators for each distinct key eagerly
>>>> purged after each fire trigger.
>>>>
>>>> What gotcha's do I need to look for.
>>>>
>>>> Thanks in advance and appologies for the length
>>>>
>>>> -stephenc
>>>>
>>>>
>>>>

Re: Reduce one event under multiple keys

Posted by Stephen Connolly <st...@gmail.com>.
On Mon, 11 Feb 2019 at 09:42, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Stephen,
>
> A window is created with the first record that is assigned to it.
> If the windows are based on time and a key, than no window will be created
> (and not space be occupied) if there is not a first record for a key and
> time interval.
>
> Anyway, if tracking the number of open files & average opening time is
> your use case, you might want to implement the logic with a ProcessFunction
> instead of a window.
> The reason is that it is that time windows don't share state, i.e., the
> information about an opened but not yet closed file would not be "carried
> over" to the next window.
> However, if you use a ProcessFunction, you are responsible for cleaning up
> the state.
>

Ahh but I am cheating by ensuring the events are rich enough that I do not
need to match them.

I get the "open" (they are not really "open" events - I have mapped to an
analogy... it might be more like a build job start events... or not... I'm
not at liberty to say ;-) ) events because I need to count the number of
"open"s per time period.

I get the "close" events and they include the duration plus other
information that can then be transformed into the required metrics... yes I
could derive the "open" from the "close" by subtracting the duration but:

1. they would cross window boundaries quite often, leading to repeated
fetch-update-write operations on the backing data store
2. they wouldn't be as "live" and one of the things we need to know is how
many "open"s there are in the previous window... given some durations can
be many days, waiting for the "close" event to create the "open" metric
would not be a good plan.

Basically, I am pushing some of the calculations to the edge where there is
state that makes those calculations cheap and then the rich events are
*hopefully* easy to aggregate with just simple aggregation functions that
only need to maintain the running total... at least that's what the PoC I
am experimenting with Flink should show


>
> Hope this helps,
> Fabian
>
> Am So., 10. Feb. 2019 um 20:36 Uhr schrieb Stephen Connolly <
> stephen.alan.connolly@gmail.com>:
>
>>
>>
>> On Sun, 10 Feb 2019 at 09:09, Chesnay Schepler <ch...@apache.org>
>> wrote:
>>
>>> This sounds reasonable to me.
>>>
>>> I'm a bit confused by this question: "*Additionally, I am (naïevely)
>>> hoping that if a window has no events for a particular key, the
>>> memory/storage costs are zero for that key.*"
>>>
>>> Are you asking whether a key that was received in window X (as part of
>>> an event) is still present in window x+1? If so, then the answer is no; a
>>> key will only be present in a given window if an event was received that
>>> fits into that window.
>>>
>>
>> To confirm:
>>
>> So let's say I'l tracking the average time a file is opened in folders.
>>
>> In window N we get the events:
>>
>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}
>>
>> {"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}
>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User guide.txt"}
>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin guide.txt"}
>>
>> So there will be aggregates stored for
>> ("ca:fe:ba:be","/"), ("ca:fe:ba:be","/foo"), ("ca:fe:ba:be","/foo/bar"),
>> ("ca:fe:ba:be","/foo/bar/README.txt"), etc
>>
>> In window N+1 we do not get any events at all.
>>
>> So the memory used by my aggregation functions from window N will be
>> freed and the storage will be effectively zero (modulo any follow on
>> processing that might be on a longer window)
>>
>> This seems to be what you are saying... in which case my naïeve hope was
>> not so naïve! w00t!
>>
>>
>>>
>>> On 08.02.2019 13:21, Stephen Connolly wrote:
>>>
>>> Ok, I'll try and map my problem into something that should be familiar
>>> to most people.
>>>
>>> Consider collection of PCs, each of which has a unique ID, e.g.
>>> ca:fe:ba:be, de:ad:be:ef, etc.
>>>
>>> Each PC has a tree of local files. Some of the file paths are
>>> coincidentally the same names, but there is no file sharing between PCs.
>>>
>>> I need to produce metrics about how often files are opened and how long
>>> they are open for.
>>>
>>> I need for every X minute tumbling window not just the cumulative
>>> averages for each PC, but the averages for each file as well as the
>>> cumulative averegaes for each folder and their sub-folders.
>>>
>>> I have a stream of events like
>>>
>>>
>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}{"source":"de:ad:be:ef","action":"open","path":"/foo/manchu/README.txt"}
>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User
>>> guide.txt"}{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin
>>> guide.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/User
>>> guide.txt","duration":"97"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/Admin
>>> guide.txt","duration":"196"}
>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/manchu/README.txt"}
>>> {"source":"de:ad:be:ef","action":"open","path":"/bar/foo/README.txt"}
>>>
>>> So from that I would like to know stuff like:
>>>
>>> ca:fe:ba:be had 4/X opens per minute in the X minute window
>>> ca:fe:ba:be had 3/X closes per minute in the X minute window and the
>>> average time open was (67+97+197)/3=120... there is no guarantee that the
>>> closes will be matched with opens in the same window, which is why I'm only
>>> tracking them separately
>>> de:ad:be:ef had 2/X opens per minute in the X minute window
>>> ca:fe:ba:be /foo had 4/X opens per minute in the X minute window
>>> ca:fe:ba:be /foo had 3/X closes per minute in the X minute window and
>>> the average time open was 120
>>> de:ad:be:ef /foo had 1/X opens per minute in the X minute window
>>> de:ad:be:ef /bar had 1/X opens per minute in the X minute window
>>> de:ad:be:ef /foo/manchu had 1/X opens per minute in the X minute window
>>> de:ad:be:ef /bar/foo had 1/X opens per minute in the X minute window
>>> de:ad:be:ef /foo/manchu/README.txt had 1/X opens per minute in the X
>>> minute window
>>> de:ad:be:ef /bar/foo/README.txt had 1/X opens per minute in the X minute
>>> window
>>> etc
>>>
>>> What I think I want to do is turn each event into a series of events
>>> with different keys, so that
>>>
>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}
>>>
>>> gets sent under the keys:
>>>
>>> ("ca:fe:ba:be","/")
>>> ("ca:fe:ba:be","/foo")
>>> ("ca:fe:ba:be","/foo/bar")
>>> ("ca:fe:ba:be","/foo/bar/README.txt")
>>>
>>> Then I could use a window aggregation function to just:
>>>
>>> * count the "open" events
>>> * count the "close" events and sum their duration
>>>
>>> Additionally, I am (naïevely) hoping that if a window has no events for
>>> a particular key, the memory/storage costs are zero for that key.
>>>
>>> From what I can see, to achieve what I am trying to do, I could use a
>>> flatMap followed by a keyBy
>>>
>>> In other words I take the events and flat map them based on the path
>>> split on '/' returning a Tuple of the (to be) key and the event. Then I can
>>> use keyBy to key based on the Tuple 0.
>>>
>>> My ask:
>>>
>>> Is the above design a good design? How would you achieve the end game
>>> better? Do I need to worry about many paths that are accessed rarely and
>>> would have an accumulator function that stays at 0 unless there are events
>>> in that window... or are the accumulators for each distinct key eagerly
>>> purged after each fire trigger.
>>>
>>> What gotcha's do I need to look for.
>>>
>>> Thanks in advance and appologies for the length
>>>
>>> -stephenc
>>>
>>>
>>>

Re: Reduce one event under multiple keys

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Stephen,

A window is created with the first record that is assigned to it.
If the windows are based on time and a key, than no window will be created
(and not space be occupied) if there is not a first record for a key and
time interval.

Anyway, if tracking the number of open files & average opening time is your
use case, you might want to implement the logic with a ProcessFunction
instead of a window.
The reason is that it is that time windows don't share state, i.e., the
information about an opened but not yet closed file would not be "carried
over" to the next window.
However, if you use a ProcessFunction, you are responsible for cleaning up
the state.

Hope this helps,
Fabian

Am So., 10. Feb. 2019 um 20:36 Uhr schrieb Stephen Connolly <
stephen.alan.connolly@gmail.com>:

>
>
> On Sun, 10 Feb 2019 at 09:09, Chesnay Schepler <ch...@apache.org> wrote:
>
>> This sounds reasonable to me.
>>
>> I'm a bit confused by this question: "*Additionally, I am (naïevely)
>> hoping that if a window has no events for a particular key, the
>> memory/storage costs are zero for that key.*"
>>
>> Are you asking whether a key that was received in window X (as part of an
>> event) is still present in window x+1? If so, then the answer is no; a key
>> will only be present in a given window if an event was received that fits
>> into that window.
>>
>
> To confirm:
>
> So let's say I'l tracking the average time a file is opened in folders.
>
> In window N we get the events:
>
> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}
>
> {"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}
> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User guide.txt"}
> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin guide.txt"}
>
> So there will be aggregates stored for
> ("ca:fe:ba:be","/"), ("ca:fe:ba:be","/foo"), ("ca:fe:ba:be","/foo/bar"),
> ("ca:fe:ba:be","/foo/bar/README.txt"), etc
>
> In window N+1 we do not get any events at all.
>
> So the memory used by my aggregation functions from window N will be freed
> and the storage will be effectively zero (modulo any follow on processing
> that might be on a longer window)
>
> This seems to be what you are saying... in which case my naïeve hope was
> not so naïve! w00t!
>
>
>>
>> On 08.02.2019 13:21, Stephen Connolly wrote:
>>
>> Ok, I'll try and map my problem into something that should be familiar to
>> most people.
>>
>> Consider collection of PCs, each of which has a unique ID, e.g.
>> ca:fe:ba:be, de:ad:be:ef, etc.
>>
>> Each PC has a tree of local files. Some of the file paths are
>> coincidentally the same names, but there is no file sharing between PCs.
>>
>> I need to produce metrics about how often files are opened and how long
>> they are open for.
>>
>> I need for every X minute tumbling window not just the cumulative
>> averages for each PC, but the averages for each file as well as the
>> cumulative averegaes for each folder and their sub-folders.
>>
>> I have a stream of events like
>>
>>
>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}{"source":"de:ad:be:ef","action":"open","path":"/foo/manchu/README.txt"}
>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User
>> guide.txt"}{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin
>> guide.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/User
>> guide.txt","duration":"97"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/Admin
>> guide.txt","duration":"196"}
>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/manchu/README.txt"}
>> {"source":"de:ad:be:ef","action":"open","path":"/bar/foo/README.txt"}
>>
>> So from that I would like to know stuff like:
>>
>> ca:fe:ba:be had 4/X opens per minute in the X minute window
>> ca:fe:ba:be had 3/X closes per minute in the X minute window and the
>> average time open was (67+97+197)/3=120... there is no guarantee that the
>> closes will be matched with opens in the same window, which is why I'm only
>> tracking them separately
>> de:ad:be:ef had 2/X opens per minute in the X minute window
>> ca:fe:ba:be /foo had 4/X opens per minute in the X minute window
>> ca:fe:ba:be /foo had 3/X closes per minute in the X minute window and the
>> average time open was 120
>> de:ad:be:ef /foo had 1/X opens per minute in the X minute window
>> de:ad:be:ef /bar had 1/X opens per minute in the X minute window
>> de:ad:be:ef /foo/manchu had 1/X opens per minute in the X minute window
>> de:ad:be:ef /bar/foo had 1/X opens per minute in the X minute window
>> de:ad:be:ef /foo/manchu/README.txt had 1/X opens per minute in the X
>> minute window
>> de:ad:be:ef /bar/foo/README.txt had 1/X opens per minute in the X minute
>> window
>> etc
>>
>> What I think I want to do is turn each event into a series of events with
>> different keys, so that
>>
>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}
>>
>> gets sent under the keys:
>>
>> ("ca:fe:ba:be","/")
>> ("ca:fe:ba:be","/foo")
>> ("ca:fe:ba:be","/foo/bar")
>> ("ca:fe:ba:be","/foo/bar/README.txt")
>>
>> Then I could use a window aggregation function to just:
>>
>> * count the "open" events
>> * count the "close" events and sum their duration
>>
>> Additionally, I am (naïevely) hoping that if a window has no events for a
>> particular key, the memory/storage costs are zero for that key.
>>
>> From what I can see, to achieve what I am trying to do, I could use a
>> flatMap followed by a keyBy
>>
>> In other words I take the events and flat map them based on the path
>> split on '/' returning a Tuple of the (to be) key and the event. Then I can
>> use keyBy to key based on the Tuple 0.
>>
>> My ask:
>>
>> Is the above design a good design? How would you achieve the end game
>> better? Do I need to worry about many paths that are accessed rarely and
>> would have an accumulator function that stays at 0 unless there are events
>> in that window... or are the accumulators for each distinct key eagerly
>> purged after each fire trigger.
>>
>> What gotcha's do I need to look for.
>>
>> Thanks in advance and appologies for the length
>>
>> -stephenc
>>
>>
>>

Re: Reduce one event under multiple keys

Posted by Stephen Connolly <st...@gmail.com>.
On Sun, 10 Feb 2019 at 09:09, Chesnay Schepler <ch...@apache.org> wrote:

> This sounds reasonable to me.
>
> I'm a bit confused by this question: "*Additionally, I am (naïevely)
> hoping that if a window has no events for a particular key, the
> memory/storage costs are zero for that key.*"
>
> Are you asking whether a key that was received in window X (as part of an
> event) is still present in window x+1? If so, then the answer is no; a key
> will only be present in a given window if an event was received that fits
> into that window.
>

To confirm:

So let's say I'l tracking the average time a file is opened in folders.

In window N we get the events:

{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}
{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User guide.txt"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin guide.txt"}

So there will be aggregates stored for
("ca:fe:ba:be","/"), ("ca:fe:ba:be","/foo"), ("ca:fe:ba:be","/foo/bar"),
("ca:fe:ba:be","/foo/bar/README.txt"), etc

In window N+1 we do not get any events at all.

So the memory used by my aggregation functions from window N will be freed
and the storage will be effectively zero (modulo any follow on processing
that might be on a longer window)

This seems to be what you are saying... in which case my naïeve hope was
not so naïve! w00t!


>
> On 08.02.2019 13:21, Stephen Connolly wrote:
>
> Ok, I'll try and map my problem into something that should be familiar to
> most people.
>
> Consider collection of PCs, each of which has a unique ID, e.g.
> ca:fe:ba:be, de:ad:be:ef, etc.
>
> Each PC has a tree of local files. Some of the file paths are
> coincidentally the same names, but there is no file sharing between PCs.
>
> I need to produce metrics about how often files are opened and how long
> they are open for.
>
> I need for every X minute tumbling window not just the cumulative averages
> for each PC, but the averages for each file as well as the cumulative
> averegaes for each folder and their sub-folders.
>
> I have a stream of events like
>
>
> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}{"source":"de:ad:be:ef","action":"open","path":"/foo/manchu/README.txt"}
> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User
> guide.txt"}{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin
> guide.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/User
> guide.txt","duration":"97"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/Admin
> guide.txt","duration":"196"}
> {"source":"ca:fe:ba:be","action":"open","path":"/foo/manchu/README.txt"}
> {"source":"de:ad:be:ef","action":"open","path":"/bar/foo/README.txt"}
>
> So from that I would like to know stuff like:
>
> ca:fe:ba:be had 4/X opens per minute in the X minute window
> ca:fe:ba:be had 3/X closes per minute in the X minute window and the
> average time open was (67+97+197)/3=120... there is no guarantee that the
> closes will be matched with opens in the same window, which is why I'm only
> tracking them separately
> de:ad:be:ef had 2/X opens per minute in the X minute window
> ca:fe:ba:be /foo had 4/X opens per minute in the X minute window
> ca:fe:ba:be /foo had 3/X closes per minute in the X minute window and the
> average time open was 120
> de:ad:be:ef /foo had 1/X opens per minute in the X minute window
> de:ad:be:ef /bar had 1/X opens per minute in the X minute window
> de:ad:be:ef /foo/manchu had 1/X opens per minute in the X minute window
> de:ad:be:ef /bar/foo had 1/X opens per minute in the X minute window
> de:ad:be:ef /foo/manchu/README.txt had 1/X opens per minute in the X
> minute window
> de:ad:be:ef /bar/foo/README.txt had 1/X opens per minute in the X minute
> window
> etc
>
> What I think I want to do is turn each event into a series of events with
> different keys, so that
>
> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}
>
> gets sent under the keys:
>
> ("ca:fe:ba:be","/")
> ("ca:fe:ba:be","/foo")
> ("ca:fe:ba:be","/foo/bar")
> ("ca:fe:ba:be","/foo/bar/README.txt")
>
> Then I could use a window aggregation function to just:
>
> * count the "open" events
> * count the "close" events and sum their duration
>
> Additionally, I am (naïevely) hoping that if a window has no events for a
> particular key, the memory/storage costs are zero for that key.
>
> From what I can see, to achieve what I am trying to do, I could use a
> flatMap followed by a keyBy
>
> In other words I take the events and flat map them based on the path split
> on '/' returning a Tuple of the (to be) key and the event. Then I can use
> keyBy to key based on the Tuple 0.
>
> My ask:
>
> Is the above design a good design? How would you achieve the end game
> better? Do I need to worry about many paths that are accessed rarely and
> would have an accumulator function that stays at 0 unless there are events
> in that window... or are the accumulators for each distinct key eagerly
> purged after each fire trigger.
>
> What gotcha's do I need to look for.
>
> Thanks in advance and appologies for the length
>
> -stephenc
>
>
>

Re: Reduce one event under multiple keys

Posted by Chesnay Schepler <ch...@apache.org>.
This sounds reasonable to me.

I'm a bit confused by this question: "/Additionally, I am (naïevely) 
hoping that if a window has no events for a particular key, the 
memory/storage costs are zero for that key./"

Are you asking whether a key that was received in window X (as part of 
an event) is still present in window x+1? If so, then the answer is no; 
a key will only be present in a given window if an event was received 
that fits into that window.

On 08.02.2019 13:21, Stephen Connolly wrote:
> Ok, I'll try and map my problem into something that should be familiar 
> to most people.
>
> Consider collection of PCs, each of which has a unique ID, e.g. 
> ca:fe:ba:be, de:ad:be:ef, etc.
>
> Each PC has a tree of local files. Some of the file paths are 
> coincidentally the same names, but there is no file sharing between PCs.
>
> I need to produce metrics about how often files are opened and how 
> long they are open for.
>
> I need for every X minute tumbling window not just the cumulative 
> averages for each PC, but the averages for each file as well as the 
> cumulative averegaes for each folder and their sub-folders.
>
> I have a stream of events like
>
> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}{"source":"de:ad:be:ef","action":"open","path":"/foo/manchu/README.txt"}
> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User 
> guide.txt"}{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin 
> guide.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/User 
> guide.txt","duration":"97"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/Admin 
> guide.txt","duration":"196"}
> {"source":"ca:fe:ba:be","action":"open","path":"/foo/manchu/README.txt"}
> {"source":"de:ad:be:ef","action":"open","path":"/bar/foo/README.txt"}
>
> So from that I would like to know stuff like:
>
> ca:fe:ba:be had 4/X opens per minute in the X minute window
> ca:fe:ba:be had 3/X closes per minute in the X minute window and the 
> average time open was (67+97+197)/3=120... there is no guarantee that 
> the closes will be matched with opens in the same window, which is why 
> I'm only tracking them separately
> de:ad:be:ef had 2/X opens per minute in the X minute window
> ca:fe:ba:be /foo had 4/X opens per minute in the X minute window
> ca:fe:ba:be /foo had 3/X closes per minute in the X minute window and 
> the average time open was 120
> de:ad:be:ef /foo had 1/X opens per minute in the X minute window
> de:ad:be:ef /bar had 1/X opens per minute in the X minute window
> de:ad:be:ef /foo/manchu had 1/X opens per minute in the X minute window
> de:ad:be:ef /bar/foo had 1/X opens per minute in the X minute window
> de:ad:be:ef /foo/manchu/README.txt had 1/X opens per minute in the X 
> minute window
> de:ad:be:ef /bar/foo/README.txt had 1/X opens per minute in the X 
> minute window
> etc
>
> What I think I want to do is turn each event into a series of events 
> with different keys, so that
>
> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}
>
> gets sent under the keys:
>
> ("ca:fe:ba:be","/")
> ("ca:fe:ba:be","/foo")
> ("ca:fe:ba:be","/foo/bar")
> ("ca:fe:ba:be","/foo/bar/README.txt")
>
> Then I could use a window aggregation function to just:
>
> * count the "open" events
> * count the "close" events and sum their duration
>
> Additionally, I am (naïevely) hoping that if a window has no events 
> for a particular key, the memory/storage costs are zero for that key.
>
> From what I can see, to achieve what I am trying to do, I could use a 
> flatMap followed by a keyBy
>
> In other words I take the events and flat map them based on the path 
> split on '/' returning a Tuple of the (to be) key and the event. Then 
> I can use keyBy to key based on the Tuple 0.
>
> My ask:
>
> Is the above design a good design? How would you achieve the end game 
> better? Do I need to worry about many paths that are accessed rarely 
> and would have an accumulator function that stays at 0 unless there 
> are events in that window... or are the accumulators for each distinct 
> key eagerly purged after each fire trigger.
>
> What gotcha's do I need to look for.
>
> Thanks in advance and appologies for the length
>
> -stephenc