You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Jan Lukavský <je...@seznam.cz> on 2017/12/12 09:23:52 UTC

Storing large lists into state per key

Hi all,

I have a question that appears as a user@ question, but brought me into 
the dev@ mailing list while I was browsing through the Flink's source 
codes. First I'll try to briefly describe my use case. I'm trying to do 
a group-by-key operation with a limited number of distinct keys (which I 
cannot control), but a non trivial count of values. The operation in the 
GBK is non-combining, so that all values per key (many) have to be 
stored in a state. Running this on testing data led to a surprise (for 
me), that even when using RocksDBStateBackend, the whole list of data is 
serialized into single binary blob and then deserialized into List, and 
therefore has to fit in memory (multiple times, in fact).

I tried to create an alternative RocksDBStateBackend, that would store 
each element of list in ListState to a separate key in RocksDB, so that 
the whole blob would not have to be loaded by a single get, but a scan 
over multiple keys could be made. Digging into the source code I found 
there was a hierarchy of classes mirroring the public API in 'internal' 
package - InternalKvState, InternalMergingState, InternalListState, and 
so on. These classes however have different hierarchy than the public 
API classes that they mirror, most notably InternalKvState is 
superinterface of all others. This fact seems to be used on multiple 
places throughout the source code.

My question is - is this intentional? Would it be possible to store each 
element of a ListState in a separate key in RocksDB (probably by adding 
some suffix to the actual key of the state for each element)? What are 
the pitfalls? And is it necessary for the InternalListState to be 
actually subinterface of InternalKvState? I find this to be a related 
problem.

Many thanks for any comments or thoughts,

  Jan


Re: Storing large lists into state per key

Posted by Stephan Ewen <se...@apache.org>.
Thanks a lot!

On Tue, Dec 19, 2017 at 11:08 PM, Jan Lukavský <je...@seznam.cz> wrote:

> Hi,
>
> I filled a JIRA issue and pushed a PR for this.
>
> https://issues.apache.org/jira/browse/FLINK-8297
>
> Best,
>
>  Jan
>
>
> On 12/14/2017 11:13 AM, Stephan Ewen wrote:
>
>> Hi Jan!
>>
>> One could implement the RocksDB ListState like you suggested.
>>
>> We did it the current way because that pattern is actually quite efficient
>> if you list fits into memory - The list append is constant and the list
>> access is the first time the values are concatenated. Especially for
>> typical windowing patterns (frequent append(), occasional get()) this
>> works
>> quite well.
>>
>> It falls short when the lists get too large, that is correct. To break it
>> into individual elements means to have a range iterator for list.get()
>> access which I think is a bit more costly. It also needs a nifty way to
>> add
>> a 'position' number into the key to make sure the list remains ordered,
>> and
>> to not have to have extra read-modify-write state every time this number
>> is
>> updated.
>>
>> But all in all, it should be possible. Are you interested in working on
>> something like that and contributing it?
>>
>> Best,
>> Stephan
>>
>>
>> On Wed, Dec 13, 2017 at 2:22 PM, Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>> Hi,
>>>
>>> If I remember correctly, there was actually an effort to change the
>>> RocksDB list state the way you described. I'm cc'ing Stephan, who was
>>> involved in that and this is the Jira issue:
>>> https://issues.apache.org/jira/browse/FLINK-5756 <
>>> https://issues.apache.org/jira/browse/FLINK-5756>
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 12. Dec 2017, at 14:47, Ovidiu-Cristian MARCU <
>>>>
>>> ovidiu-cristian.marcu@inria.fr> wrote:
>>>
>>>> Hi Jan,
>>>>
>>>> You could associate a key to each element of your Key's list (e.g.,
>>>>
>>> hashing the value), keep only the keys in heap (e.g., in a list) and the
>>> associated state key-value/s in an external store like RocksDB/Redis, but
>>> you will notice large overheads due to de/serializing - a huge penatly
>>> for
>>> more than 1000s of elements (see https://hal.inria.fr/hal-01530
>>> 744/document <https://hal.inria.fr/hal-01530744/document> for some
>>> experimental settings) for relatively small rate of new events per Key,
>>> if
>>> needed to process all values of a Key for each new event. Best case you
>>> can
>>> do some incremental processing unless your non-combining means
>>> non-associative operations per Key.
>>>
>>>> Best,
>>>> Ovidiu
>>>>
>>>>> On 12 Dec 2017, at 11:54, Jan Lukavský <je...@seznam.cz> wrote:
>>>>>
>>>>> Hi Fabian,
>>>>>
>>>>> thanks for quick reply, what you suggest seems to work at first sight,
>>>>>
>>>> I will try it. Is there any reason not to implement a RocksDBListState
>>> this
>>> way in general? Is there any increased overhead of this approach?
>>>
>>>> Thanks,
>>>>>
>>>>> Jan
>>>>>
>>>>>
>>>>> On 12/12/2017 11:17 AM, Fabian Hueske wrote:
>>>>>
>>>>>> Hi Jan,
>>>>>>
>>>>>> I cannot comment on the internal design, but you could put the data
>>>>>>
>>>>> into a
>>>
>>>> RocksDBStateBackend MapState<Integer, X> where the value X is your data
>>>>>> type and the key is the list index. You would need another ValueState
>>>>>>
>>>>> for
>>>
>>>> the current number of elements that you put into the MapState.
>>>>>> A MapState allows to fetch and traverse the key, value, or entry set
>>>>>>
>>>>> of the
>>>
>>>> Map without loading it completely into memory.
>>>>>> The sets are traversed in sort order of the key, so should be in
>>>>>>
>>>>> insertion
>>>
>>>> order (given that you properly increment the list index).
>>>>>>
>>>>>> Best, Fabian
>>>>>>
>>>>>> 2017-12-12 10:23 GMT+01:00 Jan Lukavský <je...@seznam.cz>:
>>>>>>
>>>>>> Hi all,
>>>>>>>
>>>>>>> I have a question that appears as a user@ question, but brought me
>>>>>>>
>>>>>> into
>>>
>>>> the dev@ mailing list while I was browsing through the Flink's source
>>>>>>> codes. First I'll try to briefly describe my use case. I'm trying to
>>>>>>>
>>>>>> do a
>>>
>>>> group-by-key operation with a limited number of distinct keys (which I
>>>>>>> cannot control), but a non trivial count of values. The operation in
>>>>>>>
>>>>>> the
>>>
>>>> GBK is non-combining, so that all values per key (many) have to be
>>>>>>>
>>>>>> stored
>>>
>>>> in a state. Running this on testing data led to a surprise (for me),
>>>>>>>
>>>>>> that
>>>
>>>> even when using RocksDBStateBackend, the whole list of data is
>>>>>>>
>>>>>> serialized
>>>
>>>> into single binary blob and then deserialized into List, and
>>>>>>>
>>>>>> therefore has
>>>
>>>> to fit in memory (multiple times, in fact).
>>>>>>>
>>>>>>> I tried to create an alternative RocksDBStateBackend, that would
>>>>>>> store
>>>>>>> each element of list in ListState to a separate key in RocksDB, so
>>>>>>>
>>>>>> that the
>>>
>>>> whole blob would not have to be loaded by a single get, but a scan
>>>>>>>
>>>>>> over
>>>
>>>> multiple keys could be made. Digging into the source code I found
>>>>>>>
>>>>>> there was
>>>
>>>> a hierarchy of classes mirroring the public API in 'internal' package
>>>>>>>
>>>>>> -
>>>
>>>> InternalKvState, InternalMergingState, InternalListState, and so on.
>>>>>>>
>>>>>> These
>>>
>>>> classes however have different hierarchy than the public API classes
>>>>>>>
>>>>>> that
>>>
>>>> they mirror, most notably InternalKvState is superinterface of all
>>>>>>>
>>>>>> others.
>>>
>>>> This fact seems to be used on multiple places throughout the source
>>>>>>>
>>>>>> code.
>>>
>>>> My question is - is this intentional? Would it be possible to store
>>>>>>>
>>>>>> each
>>>
>>>> element of a ListState in a separate key in RocksDB (probably by
>>>>>>>
>>>>>> adding
>>>
>>>> some suffix to the actual key of the state for each element)? What
>>>>>>>
>>>>>> are the
>>>
>>>> pitfalls? And is it necessary for the InternalListState to be actually
>>>>>>> subinterface of InternalKvState? I find this to be a related problem.
>>>>>>>
>>>>>>> Many thanks for any comments or thoughts,
>>>>>>>
>>>>>>> Jan
>>>>>>>
>>>>>>>
>>>>>>>
>>>
>

Re: Storing large lists into state per key

Posted by Jan Lukavský <je...@seznam.cz>.
Hi,

I filled a JIRA issue and pushed a PR for this.

https://issues.apache.org/jira/browse/FLINK-8297

Best,

  Jan


On 12/14/2017 11:13 AM, Stephan Ewen wrote:
> Hi Jan!
>
> One could implement the RocksDB ListState like you suggested.
>
> We did it the current way because that pattern is actually quite efficient
> if you list fits into memory - The list append is constant and the list
> access is the first time the values are concatenated. Especially for
> typical windowing patterns (frequent append(), occasional get()) this works
> quite well.
>
> It falls short when the lists get too large, that is correct. To break it
> into individual elements means to have a range iterator for list.get()
> access which I think is a bit more costly. It also needs a nifty way to add
> a 'position' number into the key to make sure the list remains ordered, and
> to not have to have extra read-modify-write state every time this number is
> updated.
>
> But all in all, it should be possible. Are you interested in working on
> something like that and contributing it?
>
> Best,
> Stephan
>
>
> On Wed, Dec 13, 2017 at 2:22 PM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Hi,
>>
>> If I remember correctly, there was actually an effort to change the
>> RocksDB list state the way you described. I'm cc'ing Stephan, who was
>> involved in that and this is the Jira issue:
>> https://issues.apache.org/jira/browse/FLINK-5756 <
>> https://issues.apache.org/jira/browse/FLINK-5756>
>>
>> Best,
>> Aljoscha
>>
>>> On 12. Dec 2017, at 14:47, Ovidiu-Cristian MARCU <
>> ovidiu-cristian.marcu@inria.fr> wrote:
>>> Hi Jan,
>>>
>>> You could associate a key to each element of your Key's list (e.g.,
>> hashing the value), keep only the keys in heap (e.g., in a list) and the
>> associated state key-value/s in an external store like RocksDB/Redis, but
>> you will notice large overheads due to de/serializing - a huge penatly for
>> more than 1000s of elements (see https://hal.inria.fr/hal-01530
>> 744/document <https://hal.inria.fr/hal-01530744/document> for some
>> experimental settings) for relatively small rate of new events per Key, if
>> needed to process all values of a Key for each new event. Best case you can
>> do some incremental processing unless your non-combining means
>> non-associative operations per Key.
>>> Best,
>>> Ovidiu
>>>> On 12 Dec 2017, at 11:54, Jan Lukavský <je...@seznam.cz> wrote:
>>>>
>>>> Hi Fabian,
>>>>
>>>> thanks for quick reply, what you suggest seems to work at first sight,
>> I will try it. Is there any reason not to implement a RocksDBListState this
>> way in general? Is there any increased overhead of this approach?
>>>> Thanks,
>>>>
>>>> Jan
>>>>
>>>>
>>>> On 12/12/2017 11:17 AM, Fabian Hueske wrote:
>>>>> Hi Jan,
>>>>>
>>>>> I cannot comment on the internal design, but you could put the data
>> into a
>>>>> RocksDBStateBackend MapState<Integer, X> where the value X is your data
>>>>> type and the key is the list index. You would need another ValueState
>> for
>>>>> the current number of elements that you put into the MapState.
>>>>> A MapState allows to fetch and traverse the key, value, or entry set
>> of the
>>>>> Map without loading it completely into memory.
>>>>> The sets are traversed in sort order of the key, so should be in
>> insertion
>>>>> order (given that you properly increment the list index).
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> 2017-12-12 10:23 GMT+01:00 Jan Lukavský <je...@seznam.cz>:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I have a question that appears as a user@ question, but brought me
>> into
>>>>>> the dev@ mailing list while I was browsing through the Flink's source
>>>>>> codes. First I'll try to briefly describe my use case. I'm trying to
>> do a
>>>>>> group-by-key operation with a limited number of distinct keys (which I
>>>>>> cannot control), but a non trivial count of values. The operation in
>> the
>>>>>> GBK is non-combining, so that all values per key (many) have to be
>> stored
>>>>>> in a state. Running this on testing data led to a surprise (for me),
>> that
>>>>>> even when using RocksDBStateBackend, the whole list of data is
>> serialized
>>>>>> into single binary blob and then deserialized into List, and
>> therefore has
>>>>>> to fit in memory (multiple times, in fact).
>>>>>>
>>>>>> I tried to create an alternative RocksDBStateBackend, that would store
>>>>>> each element of list in ListState to a separate key in RocksDB, so
>> that the
>>>>>> whole blob would not have to be loaded by a single get, but a scan
>> over
>>>>>> multiple keys could be made. Digging into the source code I found
>> there was
>>>>>> a hierarchy of classes mirroring the public API in 'internal' package
>> -
>>>>>> InternalKvState, InternalMergingState, InternalListState, and so on.
>> These
>>>>>> classes however have different hierarchy than the public API classes
>> that
>>>>>> they mirror, most notably InternalKvState is superinterface of all
>> others.
>>>>>> This fact seems to be used on multiple places throughout the source
>> code.
>>>>>> My question is - is this intentional? Would it be possible to store
>> each
>>>>>> element of a ListState in a separate key in RocksDB (probably by
>> adding
>>>>>> some suffix to the actual key of the state for each element)? What
>> are the
>>>>>> pitfalls? And is it necessary for the InternalListState to be actually
>>>>>> subinterface of InternalKvState? I find this to be a related problem.
>>>>>>
>>>>>> Many thanks for any comments or thoughts,
>>>>>>
>>>>>> Jan
>>>>>>
>>>>>>
>>


Re: Storing large lists into state per key

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Stephen,

yes, definitely. I have put together a POC implementation that seems to 
work for my use-case (not yet tested for performance, though). A have 
put together a PR, just for discussion of the topic, here:

https://github.com/datadrivencz/flink/pull/1/

I know, that the PR doesn't follow the guidelines for submitting PRs, 
but I consider it still a WIP and its purpose is just to agree upon the 
implementation details. Would you find a few moments to walk it through?

Or should I file a JIRA and have the discussion there? No problem with 
that, I just thought that discussion directly with the code could be 
more productive in this case.

Thanks,

  Jan


On 12/14/2017 11:13 AM, Stephan Ewen wrote:
> Hi Jan!
>
> One could implement the RocksDB ListState like you suggested.
>
> We did it the current way because that pattern is actually quite efficient
> if you list fits into memory - The list append is constant and the list
> access is the first time the values are concatenated. Especially for
> typical windowing patterns (frequent append(), occasional get()) this works
> quite well.
>
> It falls short when the lists get too large, that is correct. To break it
> into individual elements means to have a range iterator for list.get()
> access which I think is a bit more costly. It also needs a nifty way to add
> a 'position' number into the key to make sure the list remains ordered, and
> to not have to have extra read-modify-write state every time this number is
> updated.
>
> But all in all, it should be possible. Are you interested in working on
> something like that and contributing it?
>
> Best,
> Stephan
>
>
> On Wed, Dec 13, 2017 at 2:22 PM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Hi,
>>
>> If I remember correctly, there was actually an effort to change the
>> RocksDB list state the way you described. I'm cc'ing Stephan, who was
>> involved in that and this is the Jira issue:
>> https://issues.apache.org/jira/browse/FLINK-5756 <
>> https://issues.apache.org/jira/browse/FLINK-5756>
>>
>> Best,
>> Aljoscha
>>
>>> On 12. Dec 2017, at 14:47, Ovidiu-Cristian MARCU <
>> ovidiu-cristian.marcu@inria.fr> wrote:
>>> Hi Jan,
>>>
>>> You could associate a key to each element of your Key's list (e.g.,
>> hashing the value), keep only the keys in heap (e.g., in a list) and the
>> associated state key-value/s in an external store like RocksDB/Redis, but
>> you will notice large overheads due to de/serializing - a huge penatly for
>> more than 1000s of elements (see https://hal.inria.fr/hal-01530
>> 744/document <https://hal.inria.fr/hal-01530744/document> for some
>> experimental settings) for relatively small rate of new events per Key, if
>> needed to process all values of a Key for each new event. Best case you can
>> do some incremental processing unless your non-combining means
>> non-associative operations per Key.
>>> Best,
>>> Ovidiu
>>>> On 12 Dec 2017, at 11:54, Jan Lukavský <je...@seznam.cz> wrote:
>>>>
>>>> Hi Fabian,
>>>>
>>>> thanks for quick reply, what you suggest seems to work at first sight,
>> I will try it. Is there any reason not to implement a RocksDBListState this
>> way in general? Is there any increased overhead of this approach?
>>>> Thanks,
>>>>
>>>> Jan
>>>>
>>>>
>>>> On 12/12/2017 11:17 AM, Fabian Hueske wrote:
>>>>> Hi Jan,
>>>>>
>>>>> I cannot comment on the internal design, but you could put the data
>> into a
>>>>> RocksDBStateBackend MapState<Integer, X> where the value X is your data
>>>>> type and the key is the list index. You would need another ValueState
>> for
>>>>> the current number of elements that you put into the MapState.
>>>>> A MapState allows to fetch and traverse the key, value, or entry set
>> of the
>>>>> Map without loading it completely into memory.
>>>>> The sets are traversed in sort order of the key, so should be in
>> insertion
>>>>> order (given that you properly increment the list index).
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> 2017-12-12 10:23 GMT+01:00 Jan Lukavský <je...@seznam.cz>:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I have a question that appears as a user@ question, but brought me
>> into
>>>>>> the dev@ mailing list while I was browsing through the Flink's source
>>>>>> codes. First I'll try to briefly describe my use case. I'm trying to
>> do a
>>>>>> group-by-key operation with a limited number of distinct keys (which I
>>>>>> cannot control), but a non trivial count of values. The operation in
>> the
>>>>>> GBK is non-combining, so that all values per key (many) have to be
>> stored
>>>>>> in a state. Running this on testing data led to a surprise (for me),
>> that
>>>>>> even when using RocksDBStateBackend, the whole list of data is
>> serialized
>>>>>> into single binary blob and then deserialized into List, and
>> therefore has
>>>>>> to fit in memory (multiple times, in fact).
>>>>>>
>>>>>> I tried to create an alternative RocksDBStateBackend, that would store
>>>>>> each element of list in ListState to a separate key in RocksDB, so
>> that the
>>>>>> whole blob would not have to be loaded by a single get, but a scan
>> over
>>>>>> multiple keys could be made. Digging into the source code I found
>> there was
>>>>>> a hierarchy of classes mirroring the public API in 'internal' package
>> -
>>>>>> InternalKvState, InternalMergingState, InternalListState, and so on.
>> These
>>>>>> classes however have different hierarchy than the public API classes
>> that
>>>>>> they mirror, most notably InternalKvState is superinterface of all
>> others.
>>>>>> This fact seems to be used on multiple places throughout the source
>> code.
>>>>>> My question is - is this intentional? Would it be possible to store
>> each
>>>>>> element of a ListState in a separate key in RocksDB (probably by
>> adding
>>>>>> some suffix to the actual key of the state for each element)? What
>> are the
>>>>>> pitfalls? And is it necessary for the InternalListState to be actually
>>>>>> subinterface of InternalKvState? I find this to be a related problem.
>>>>>>
>>>>>> Many thanks for any comments or thoughts,
>>>>>>
>>>>>> Jan
>>>>>>
>>>>>>
>>


Re: Storing large lists into state per key

Posted by Stephan Ewen <se...@apache.org>.
Hi Jan!

One could implement the RocksDB ListState like you suggested.

We did it the current way because that pattern is actually quite efficient
if you list fits into memory - The list append is constant and the list
access is the first time the values are concatenated. Especially for
typical windowing patterns (frequent append(), occasional get()) this works
quite well.

It falls short when the lists get too large, that is correct. To break it
into individual elements means to have a range iterator for list.get()
access which I think is a bit more costly. It also needs a nifty way to add
a 'position' number into the key to make sure the list remains ordered, and
to not have to have extra read-modify-write state every time this number is
updated.

But all in all, it should be possible. Are you interested in working on
something like that and contributing it?

Best,
Stephan


On Wed, Dec 13, 2017 at 2:22 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
>
> If I remember correctly, there was actually an effort to change the
> RocksDB list state the way you described. I'm cc'ing Stephan, who was
> involved in that and this is the Jira issue:
> https://issues.apache.org/jira/browse/FLINK-5756 <
> https://issues.apache.org/jira/browse/FLINK-5756>
>
> Best,
> Aljoscha
>
> > On 12. Dec 2017, at 14:47, Ovidiu-Cristian MARCU <
> ovidiu-cristian.marcu@inria.fr> wrote:
> >
> > Hi Jan,
> >
> > You could associate a key to each element of your Key's list (e.g.,
> hashing the value), keep only the keys in heap (e.g., in a list) and the
> associated state key-value/s in an external store like RocksDB/Redis, but
> you will notice large overheads due to de/serializing - a huge penatly for
> more than 1000s of elements (see https://hal.inria.fr/hal-01530
> 744/document <https://hal.inria.fr/hal-01530744/document> for some
> experimental settings) for relatively small rate of new events per Key, if
> needed to process all values of a Key for each new event. Best case you can
> do some incremental processing unless your non-combining means
> non-associative operations per Key.
> >
> > Best,
> > Ovidiu
> >> On 12 Dec 2017, at 11:54, Jan Lukavský <je...@seznam.cz> wrote:
> >>
> >> Hi Fabian,
> >>
> >> thanks for quick reply, what you suggest seems to work at first sight,
> I will try it. Is there any reason not to implement a RocksDBListState this
> way in general? Is there any increased overhead of this approach?
> >>
> >> Thanks,
> >>
> >> Jan
> >>
> >>
> >> On 12/12/2017 11:17 AM, Fabian Hueske wrote:
> >>> Hi Jan,
> >>>
> >>> I cannot comment on the internal design, but you could put the data
> into a
> >>> RocksDBStateBackend MapState<Integer, X> where the value X is your data
> >>> type and the key is the list index. You would need another ValueState
> for
> >>> the current number of elements that you put into the MapState.
> >>> A MapState allows to fetch and traverse the key, value, or entry set
> of the
> >>> Map without loading it completely into memory.
> >>> The sets are traversed in sort order of the key, so should be in
> insertion
> >>> order (given that you properly increment the list index).
> >>>
> >>> Best, Fabian
> >>>
> >>> 2017-12-12 10:23 GMT+01:00 Jan Lukavský <je...@seznam.cz>:
> >>>
> >>>> Hi all,
> >>>>
> >>>> I have a question that appears as a user@ question, but brought me
> into
> >>>> the dev@ mailing list while I was browsing through the Flink's source
> >>>> codes. First I'll try to briefly describe my use case. I'm trying to
> do a
> >>>> group-by-key operation with a limited number of distinct keys (which I
> >>>> cannot control), but a non trivial count of values. The operation in
> the
> >>>> GBK is non-combining, so that all values per key (many) have to be
> stored
> >>>> in a state. Running this on testing data led to a surprise (for me),
> that
> >>>> even when using RocksDBStateBackend, the whole list of data is
> serialized
> >>>> into single binary blob and then deserialized into List, and
> therefore has
> >>>> to fit in memory (multiple times, in fact).
> >>>>
> >>>> I tried to create an alternative RocksDBStateBackend, that would store
> >>>> each element of list in ListState to a separate key in RocksDB, so
> that the
> >>>> whole blob would not have to be loaded by a single get, but a scan
> over
> >>>> multiple keys could be made. Digging into the source code I found
> there was
> >>>> a hierarchy of classes mirroring the public API in 'internal' package
> -
> >>>> InternalKvState, InternalMergingState, InternalListState, and so on.
> These
> >>>> classes however have different hierarchy than the public API classes
> that
> >>>> they mirror, most notably InternalKvState is superinterface of all
> others.
> >>>> This fact seems to be used on multiple places throughout the source
> code.
> >>>>
> >>>> My question is - is this intentional? Would it be possible to store
> each
> >>>> element of a ListState in a separate key in RocksDB (probably by
> adding
> >>>> some suffix to the actual key of the state for each element)? What
> are the
> >>>> pitfalls? And is it necessary for the InternalListState to be actually
> >>>> subinterface of InternalKvState? I find this to be a related problem.
> >>>>
> >>>> Many thanks for any comments or thoughts,
> >>>>
> >>>> Jan
> >>>>
> >>>>
> >>
> >
>
>

Re: Storing large lists into state per key

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

Using a MapState is a workaround that should work but it would be nice if ListState would work for state that is too big to fit into memory.

Best,
Aljoscha

> On 13. Dec 2017, at 17:40, Jan Lukavský <je...@seznam.cz> wrote:
> 
> Hi Aljoscha,
> 
> thanks for reply. Do you see any issues in implementing the list state the way Fabian suggested (i.e. using the MapState)? I feel there are some open questions, mostly because the InternalListState (which I suppose the RocksDBListState should implement) extends InternalKvState, which in turn suggests, that the correct implementation *should* behave exactly as the current implementation does - serialize the list into one field and store as key-value. Do you think there would be any major issues with this?
> 
> Many thanks,
> 
>  Jan
> 
> 
> On 12/13/2017 02:22 PM, Aljoscha Krettek wrote:
>> Hi,
>> 
>> If I remember correctly, there was actually an effort to change the RocksDB list state the way you described. I'm cc'ing Stephan, who was involved in that and this is the Jira issue: https://issues.apache.org/jira/browse/FLINK-5756 <https://issues.apache.org/jira/browse/FLINK-5756>
>> 
>> Best,
>> Aljoscha
>> 
>>> On 12. Dec 2017, at 14:47, Ovidiu-Cristian MARCU <ov...@inria.fr> wrote:
>>> 
>>> Hi Jan,
>>> 
>>> You could associate a key to each element of your Key's list (e.g., hashing the value), keep only the keys in heap (e.g., in a list) and the associated state key-value/s in an external store like RocksDB/Redis, but you will notice large overheads due to de/serializing - a huge penatly for more than 1000s of elements (see https://hal.inria.fr/hal-01530744/document <https://hal.inria.fr/hal-01530744/document> for some experimental settings) for relatively small rate of new events per Key, if needed to process all values of a Key for each new event. Best case you can do some incremental processing unless your non-combining means non-associative operations per Key.
>>> 
>>> Best,
>>> Ovidiu
>>>> On 12 Dec 2017, at 11:54, Jan Lukavský <je...@seznam.cz> wrote:
>>>> 
>>>> Hi Fabian,
>>>> 
>>>> thanks for quick reply, what you suggest seems to work at first sight, I will try it. Is there any reason not to implement a RocksDBListState this way in general? Is there any increased overhead of this approach?
>>>> 
>>>> Thanks,
>>>> 
>>>> Jan
>>>> 
>>>> 
>>>> On 12/12/2017 11:17 AM, Fabian Hueske wrote:
>>>>> Hi Jan,
>>>>> 
>>>>> I cannot comment on the internal design, but you could put the data into a
>>>>> RocksDBStateBackend MapState<Integer, X> where the value X is your data
>>>>> type and the key is the list index. You would need another ValueState for
>>>>> the current number of elements that you put into the MapState.
>>>>> A MapState allows to fetch and traverse the key, value, or entry set of the
>>>>> Map without loading it completely into memory.
>>>>> The sets are traversed in sort order of the key, so should be in insertion
>>>>> order (given that you properly increment the list index).
>>>>> 
>>>>> Best, Fabian
>>>>> 
>>>>> 2017-12-12 10:23 GMT+01:00 Jan Lukavský <je...@seznam.cz>:
>>>>> 
>>>>>> Hi all,
>>>>>> 
>>>>>> I have a question that appears as a user@ question, but brought me into
>>>>>> the dev@ mailing list while I was browsing through the Flink's source
>>>>>> codes. First I'll try to briefly describe my use case. I'm trying to do a
>>>>>> group-by-key operation with a limited number of distinct keys (which I
>>>>>> cannot control), but a non trivial count of values. The operation in the
>>>>>> GBK is non-combining, so that all values per key (many) have to be stored
>>>>>> in a state. Running this on testing data led to a surprise (for me), that
>>>>>> even when using RocksDBStateBackend, the whole list of data is serialized
>>>>>> into single binary blob and then deserialized into List, and therefore has
>>>>>> to fit in memory (multiple times, in fact).
>>>>>> 
>>>>>> I tried to create an alternative RocksDBStateBackend, that would store
>>>>>> each element of list in ListState to a separate key in RocksDB, so that the
>>>>>> whole blob would not have to be loaded by a single get, but a scan over
>>>>>> multiple keys could be made. Digging into the source code I found there was
>>>>>> a hierarchy of classes mirroring the public API in 'internal' package -
>>>>>> InternalKvState, InternalMergingState, InternalListState, and so on. These
>>>>>> classes however have different hierarchy than the public API classes that
>>>>>> they mirror, most notably InternalKvState is superinterface of all others.
>>>>>> This fact seems to be used on multiple places throughout the source code.
>>>>>> 
>>>>>> My question is - is this intentional? Would it be possible to store each
>>>>>> element of a ListState in a separate key in RocksDB (probably by adding
>>>>>> some suffix to the actual key of the state for each element)? What are the
>>>>>> pitfalls? And is it necessary for the InternalListState to be actually
>>>>>> subinterface of InternalKvState? I find this to be a related problem.
>>>>>> 
>>>>>> Many thanks for any comments or thoughts,
>>>>>> 
>>>>>> Jan
>>>>>> 
>>>>>> 
>> 
> 


Re: Storing large lists into state per key

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Aljoscha,

thanks for reply. Do you see any issues in implementing the list state 
the way Fabian suggested (i.e. using the MapState)? I feel there are 
some open questions, mostly because the InternalListState (which I 
suppose the RocksDBListState should implement) extends InternalKvState, 
which in turn suggests, that the correct implementation *should* behave 
exactly as the current implementation does - serialize the list into one 
field and store as key-value. Do you think there would be any major 
issues with this?

Many thanks,

  Jan


On 12/13/2017 02:22 PM, Aljoscha Krettek wrote:
> Hi,
>
> If I remember correctly, there was actually an effort to change the RocksDB list state the way you described. I'm cc'ing Stephan, who was involved in that and this is the Jira issue: https://issues.apache.org/jira/browse/FLINK-5756 <https://issues.apache.org/jira/browse/FLINK-5756>
>
> Best,
> Aljoscha
>
>> On 12. Dec 2017, at 14:47, Ovidiu-Cristian MARCU <ov...@inria.fr> wrote:
>>
>> Hi Jan,
>>
>> You could associate a key to each element of your Key's list (e.g., hashing the value), keep only the keys in heap (e.g., in a list) and the associated state key-value/s in an external store like RocksDB/Redis, but you will notice large overheads due to de/serializing - a huge penatly for more than 1000s of elements (see https://hal.inria.fr/hal-01530744/document <https://hal.inria.fr/hal-01530744/document> for some experimental settings) for relatively small rate of new events per Key, if needed to process all values of a Key for each new event. Best case you can do some incremental processing unless your non-combining means non-associative operations per Key.
>>
>> Best,
>> Ovidiu
>>> On 12 Dec 2017, at 11:54, Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>> Hi Fabian,
>>>
>>> thanks for quick reply, what you suggest seems to work at first sight, I will try it. Is there any reason not to implement a RocksDBListState this way in general? Is there any increased overhead of this approach?
>>>
>>> Thanks,
>>>
>>> Jan
>>>
>>>
>>> On 12/12/2017 11:17 AM, Fabian Hueske wrote:
>>>> Hi Jan,
>>>>
>>>> I cannot comment on the internal design, but you could put the data into a
>>>> RocksDBStateBackend MapState<Integer, X> where the value X is your data
>>>> type and the key is the list index. You would need another ValueState for
>>>> the current number of elements that you put into the MapState.
>>>> A MapState allows to fetch and traverse the key, value, or entry set of the
>>>> Map without loading it completely into memory.
>>>> The sets are traversed in sort order of the key, so should be in insertion
>>>> order (given that you properly increment the list index).
>>>>
>>>> Best, Fabian
>>>>
>>>> 2017-12-12 10:23 GMT+01:00 Jan Lukavský <je...@seznam.cz>:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I have a question that appears as a user@ question, but brought me into
>>>>> the dev@ mailing list while I was browsing through the Flink's source
>>>>> codes. First I'll try to briefly describe my use case. I'm trying to do a
>>>>> group-by-key operation with a limited number of distinct keys (which I
>>>>> cannot control), but a non trivial count of values. The operation in the
>>>>> GBK is non-combining, so that all values per key (many) have to be stored
>>>>> in a state. Running this on testing data led to a surprise (for me), that
>>>>> even when using RocksDBStateBackend, the whole list of data is serialized
>>>>> into single binary blob and then deserialized into List, and therefore has
>>>>> to fit in memory (multiple times, in fact).
>>>>>
>>>>> I tried to create an alternative RocksDBStateBackend, that would store
>>>>> each element of list in ListState to a separate key in RocksDB, so that the
>>>>> whole blob would not have to be loaded by a single get, but a scan over
>>>>> multiple keys could be made. Digging into the source code I found there was
>>>>> a hierarchy of classes mirroring the public API in 'internal' package -
>>>>> InternalKvState, InternalMergingState, InternalListState, and so on. These
>>>>> classes however have different hierarchy than the public API classes that
>>>>> they mirror, most notably InternalKvState is superinterface of all others.
>>>>> This fact seems to be used on multiple places throughout the source code.
>>>>>
>>>>> My question is - is this intentional? Would it be possible to store each
>>>>> element of a ListState in a separate key in RocksDB (probably by adding
>>>>> some suffix to the actual key of the state for each element)? What are the
>>>>> pitfalls? And is it necessary for the InternalListState to be actually
>>>>> subinterface of InternalKvState? I find this to be a related problem.
>>>>>
>>>>> Many thanks for any comments or thoughts,
>>>>>
>>>>> Jan
>>>>>
>>>>>
>


Re: Storing large lists into state per key

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

If I remember correctly, there was actually an effort to change the RocksDB list state the way you described. I'm cc'ing Stephan, who was involved in that and this is the Jira issue: https://issues.apache.org/jira/browse/FLINK-5756 <https://issues.apache.org/jira/browse/FLINK-5756>

Best,
Aljoscha

> On 12. Dec 2017, at 14:47, Ovidiu-Cristian MARCU <ov...@inria.fr> wrote:
> 
> Hi Jan,
> 
> You could associate a key to each element of your Key's list (e.g., hashing the value), keep only the keys in heap (e.g., in a list) and the associated state key-value/s in an external store like RocksDB/Redis, but you will notice large overheads due to de/serializing - a huge penatly for more than 1000s of elements (see https://hal.inria.fr/hal-01530744/document <https://hal.inria.fr/hal-01530744/document> for some experimental settings) for relatively small rate of new events per Key, if needed to process all values of a Key for each new event. Best case you can do some incremental processing unless your non-combining means non-associative operations per Key.
> 
> Best,
> Ovidiu
>> On 12 Dec 2017, at 11:54, Jan Lukavský <je...@seznam.cz> wrote:
>> 
>> Hi Fabian,
>> 
>> thanks for quick reply, what you suggest seems to work at first sight, I will try it. Is there any reason not to implement a RocksDBListState this way in general? Is there any increased overhead of this approach?
>> 
>> Thanks,
>> 
>> Jan
>> 
>> 
>> On 12/12/2017 11:17 AM, Fabian Hueske wrote:
>>> Hi Jan,
>>> 
>>> I cannot comment on the internal design, but you could put the data into a
>>> RocksDBStateBackend MapState<Integer, X> where the value X is your data
>>> type and the key is the list index. You would need another ValueState for
>>> the current number of elements that you put into the MapState.
>>> A MapState allows to fetch and traverse the key, value, or entry set of the
>>> Map without loading it completely into memory.
>>> The sets are traversed in sort order of the key, so should be in insertion
>>> order (given that you properly increment the list index).
>>> 
>>> Best, Fabian
>>> 
>>> 2017-12-12 10:23 GMT+01:00 Jan Lukavský <je...@seznam.cz>:
>>> 
>>>> Hi all,
>>>> 
>>>> I have a question that appears as a user@ question, but brought me into
>>>> the dev@ mailing list while I was browsing through the Flink's source
>>>> codes. First I'll try to briefly describe my use case. I'm trying to do a
>>>> group-by-key operation with a limited number of distinct keys (which I
>>>> cannot control), but a non trivial count of values. The operation in the
>>>> GBK is non-combining, so that all values per key (many) have to be stored
>>>> in a state. Running this on testing data led to a surprise (for me), that
>>>> even when using RocksDBStateBackend, the whole list of data is serialized
>>>> into single binary blob and then deserialized into List, and therefore has
>>>> to fit in memory (multiple times, in fact).
>>>> 
>>>> I tried to create an alternative RocksDBStateBackend, that would store
>>>> each element of list in ListState to a separate key in RocksDB, so that the
>>>> whole blob would not have to be loaded by a single get, but a scan over
>>>> multiple keys could be made. Digging into the source code I found there was
>>>> a hierarchy of classes mirroring the public API in 'internal' package -
>>>> InternalKvState, InternalMergingState, InternalListState, and so on. These
>>>> classes however have different hierarchy than the public API classes that
>>>> they mirror, most notably InternalKvState is superinterface of all others.
>>>> This fact seems to be used on multiple places throughout the source code.
>>>> 
>>>> My question is - is this intentional? Would it be possible to store each
>>>> element of a ListState in a separate key in RocksDB (probably by adding
>>>> some suffix to the actual key of the state for each element)? What are the
>>>> pitfalls? And is it necessary for the InternalListState to be actually
>>>> subinterface of InternalKvState? I find this to be a related problem.
>>>> 
>>>> Many thanks for any comments or thoughts,
>>>> 
>>>> Jan
>>>> 
>>>> 
>> 
> 


Re: Storing large lists into state per key

Posted by Ovidiu-Cristian MARCU <ov...@inria.fr>.
Hi Jan,

You could associate a key to each element of your Key's list (e.g., hashing the value), keep only the keys in heap (e.g., in a list) and the associated state key-value/s in an external store like RocksDB/Redis, but you will notice large overheads due to de/serializing - a huge penatly for more than 1000s of elements (see https://hal.inria.fr/hal-01530744/document <https://hal.inria.fr/hal-01530744/document> for some experimental settings) for relatively small rate of new events per Key, if needed to process all values of a Key for each new event. Best case you can do some incremental processing unless your non-combining means non-associative operations per Key.

Best,
Ovidiu
> On 12 Dec 2017, at 11:54, Jan Lukavský <je...@seznam.cz> wrote:
> 
> Hi Fabian,
> 
> thanks for quick reply, what you suggest seems to work at first sight, I will try it. Is there any reason not to implement a RocksDBListState this way in general? Is there any increased overhead of this approach?
> 
> Thanks,
> 
>  Jan
> 
> 
> On 12/12/2017 11:17 AM, Fabian Hueske wrote:
>> Hi Jan,
>> 
>> I cannot comment on the internal design, but you could put the data into a
>> RocksDBStateBackend MapState<Integer, X> where the value X is your data
>> type and the key is the list index. You would need another ValueState for
>> the current number of elements that you put into the MapState.
>> A MapState allows to fetch and traverse the key, value, or entry set of the
>> Map without loading it completely into memory.
>> The sets are traversed in sort order of the key, so should be in insertion
>> order (given that you properly increment the list index).
>> 
>> Best, Fabian
>> 
>> 2017-12-12 10:23 GMT+01:00 Jan Lukavský <je...@seznam.cz>:
>> 
>>> Hi all,
>>> 
>>> I have a question that appears as a user@ question, but brought me into
>>> the dev@ mailing list while I was browsing through the Flink's source
>>> codes. First I'll try to briefly describe my use case. I'm trying to do a
>>> group-by-key operation with a limited number of distinct keys (which I
>>> cannot control), but a non trivial count of values. The operation in the
>>> GBK is non-combining, so that all values per key (many) have to be stored
>>> in a state. Running this on testing data led to a surprise (for me), that
>>> even when using RocksDBStateBackend, the whole list of data is serialized
>>> into single binary blob and then deserialized into List, and therefore has
>>> to fit in memory (multiple times, in fact).
>>> 
>>> I tried to create an alternative RocksDBStateBackend, that would store
>>> each element of list in ListState to a separate key in RocksDB, so that the
>>> whole blob would not have to be loaded by a single get, but a scan over
>>> multiple keys could be made. Digging into the source code I found there was
>>> a hierarchy of classes mirroring the public API in 'internal' package -
>>> InternalKvState, InternalMergingState, InternalListState, and so on. These
>>> classes however have different hierarchy than the public API classes that
>>> they mirror, most notably InternalKvState is superinterface of all others.
>>> This fact seems to be used on multiple places throughout the source code.
>>> 
>>> My question is - is this intentional? Would it be possible to store each
>>> element of a ListState in a separate key in RocksDB (probably by adding
>>> some suffix to the actual key of the state for each element)? What are the
>>> pitfalls? And is it necessary for the InternalListState to be actually
>>> subinterface of InternalKvState? I find this to be a related problem.
>>> 
>>> Many thanks for any comments or thoughts,
>>> 
>>>  Jan
>>> 
>>> 
> 


Re: Storing large lists into state per key

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Fabian,

thanks for quick reply, what you suggest seems to work at first sight, I 
will try it. Is there any reason not to implement a RocksDBListState 
this way in general? Is there any increased overhead of this approach?

Thanks,

  Jan


On 12/12/2017 11:17 AM, Fabian Hueske wrote:
> Hi Jan,
>
> I cannot comment on the internal design, but you could put the data into a
> RocksDBStateBackend MapState<Integer, X> where the value X is your data
> type and the key is the list index. You would need another ValueState for
> the current number of elements that you put into the MapState.
> A MapState allows to fetch and traverse the key, value, or entry set of the
> Map without loading it completely into memory.
> The sets are traversed in sort order of the key, so should be in insertion
> order (given that you properly increment the list index).
>
> Best, Fabian
>
> 2017-12-12 10:23 GMT+01:00 Jan Lukavský <je...@seznam.cz>:
>
>> Hi all,
>>
>> I have a question that appears as a user@ question, but brought me into
>> the dev@ mailing list while I was browsing through the Flink's source
>> codes. First I'll try to briefly describe my use case. I'm trying to do a
>> group-by-key operation with a limited number of distinct keys (which I
>> cannot control), but a non trivial count of values. The operation in the
>> GBK is non-combining, so that all values per key (many) have to be stored
>> in a state. Running this on testing data led to a surprise (for me), that
>> even when using RocksDBStateBackend, the whole list of data is serialized
>> into single binary blob and then deserialized into List, and therefore has
>> to fit in memory (multiple times, in fact).
>>
>> I tried to create an alternative RocksDBStateBackend, that would store
>> each element of list in ListState to a separate key in RocksDB, so that the
>> whole blob would not have to be loaded by a single get, but a scan over
>> multiple keys could be made. Digging into the source code I found there was
>> a hierarchy of classes mirroring the public API in 'internal' package -
>> InternalKvState, InternalMergingState, InternalListState, and so on. These
>> classes however have different hierarchy than the public API classes that
>> they mirror, most notably InternalKvState is superinterface of all others.
>> This fact seems to be used on multiple places throughout the source code.
>>
>> My question is - is this intentional? Would it be possible to store each
>> element of a ListState in a separate key in RocksDB (probably by adding
>> some suffix to the actual key of the state for each element)? What are the
>> pitfalls? And is it necessary for the InternalListState to be actually
>> subinterface of InternalKvState? I find this to be a related problem.
>>
>> Many thanks for any comments or thoughts,
>>
>>   Jan
>>
>>


Re: Storing large lists into state per key

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Jan,

I cannot comment on the internal design, but you could put the data into a
RocksDBStateBackend MapState<Integer, X> where the value X is your data
type and the key is the list index. You would need another ValueState for
the current number of elements that you put into the MapState.
A MapState allows to fetch and traverse the key, value, or entry set of the
Map without loading it completely into memory.
The sets are traversed in sort order of the key, so should be in insertion
order (given that you properly increment the list index).

Best, Fabian

2017-12-12 10:23 GMT+01:00 Jan Lukavský <je...@seznam.cz>:

> Hi all,
>
> I have a question that appears as a user@ question, but brought me into
> the dev@ mailing list while I was browsing through the Flink's source
> codes. First I'll try to briefly describe my use case. I'm trying to do a
> group-by-key operation with a limited number of distinct keys (which I
> cannot control), but a non trivial count of values. The operation in the
> GBK is non-combining, so that all values per key (many) have to be stored
> in a state. Running this on testing data led to a surprise (for me), that
> even when using RocksDBStateBackend, the whole list of data is serialized
> into single binary blob and then deserialized into List, and therefore has
> to fit in memory (multiple times, in fact).
>
> I tried to create an alternative RocksDBStateBackend, that would store
> each element of list in ListState to a separate key in RocksDB, so that the
> whole blob would not have to be loaded by a single get, but a scan over
> multiple keys could be made. Digging into the source code I found there was
> a hierarchy of classes mirroring the public API in 'internal' package -
> InternalKvState, InternalMergingState, InternalListState, and so on. These
> classes however have different hierarchy than the public API classes that
> they mirror, most notably InternalKvState is superinterface of all others.
> This fact seems to be used on multiple places throughout the source code.
>
> My question is - is this intentional? Would it be possible to store each
> element of a ListState in a separate key in RocksDB (probably by adding
> some suffix to the actual key of the state for each element)? What are the
> pitfalls? And is it necessary for the InternalListState to be actually
> subinterface of InternalKvState? I find this to be a related problem.
>
> Many thanks for any comments or thoughts,
>
>  Jan
>
>