You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ken Krugler <kk...@transpac.com> on 2018/02/18 23:10:12 UTC

Iterating over state entries

Hi there,

I’ve got a MapState where I need to iterate over the entries.

This currently isn’t supported (at least for Rocks DB), AFAIK, though there is an issue/PR <https://issues.apache.org/jira/browse/FLINK-8297> to improve this.

The best solution I’ve seen is what Fabian proposed, which involves keeping a ValueState with a count of entries, and then having the key for the MapState be the index.

> I cannot comment on the internal design, but you could put the data into a
> RocksDBStateBackend MapState<Integer, X> where the value X is your data
> type and the key is the list index. You would need another ValueState for
> the current number of elements that you put into the MapState.
> A MapState allows to fetch and traverse the key, value, or entry set of the
> Map without loading it completely into memory.
> The sets are traversed in sort order of the key, so should be in insertion
> order (given that you properly increment the list index).


This effectively lets you iterate over all of the map entries for a given (keyed) state - though it doesn’t solve the “I have to iterate over _every_ entry” situation.

Is this currently the best option?

Thanks,

— Ken

--------------------------------------------
http://about.me/kkrugler
+1 530-210-6378


Re: Iterating over state entries

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Ken,

That's correct. The iterator will become invalid once you leave the method.
If you are only interested in a few specific entries than index access is
probably the most efficient approach.

Best, Fabian

2018-02-20 1:03 GMT+01:00 Ken Krugler <kk...@transpac.com>:

> Hi Till,
>
> On Feb 19, 2018, at 8:14 AM, Till Rohrmann <tr...@apache.org> wrote:
>
> Hi Ken,
>
> just for my clarification, the `RocksDBMapState#entries` method does not
> satisfy your requirements? This method does not allow you to iterate across
> different keys of your keyed stream of course. But it should allow you to
> iterate over the different entries for a given key of your keyed stream.
>
>
> As per my email to Fabian, I should have been more precise in my
> requirements.
>
> I need to do incremental iteration of the entries, versus a complete
> iteration.
>
> And I'm assuming I can't keep the iterator around across calls to the
> function.
>
> Regards,
>
> — Ken
>
>
> On Mon, Feb 19, 2018 at 12:10 AM, Ken Krugler <kkrugler_lists@transpac.com
> > wrote:
>
>> Hi there,
>>
>> I’ve got a MapState where I need to iterate over the entries.
>>
>> This currently isn’t supported (at least for Rocks DB), AFAIK, though
>> there is an issue/PR <https://issues.apache.org/jira/browse/FLINK-8297> to
>> improve this.
>>
>> The best solution I’ve seen is what Fabian proposed, which involves
>> keeping a ValueState with a count of entries, and then having the key for
>> the MapState be the index.
>>
>> I cannot comment on the internal design, but you could put the data into a
>> RocksDBStateBackend MapState<Integer, X> where the value X is your data
>> type and the key is the list index. You would need another ValueState for
>> the current number of elements that you put into the MapState.
>> A MapState allows to fetch and traverse the key, value, or entry set of
>> the
>> Map without loading it completely into memory.
>> The sets are traversed in sort order of the key, so should be in insertion
>> order (given that you properly increment the list index).
>>
>>
>> This effectively lets you iterate over all of the map entries for a given
>> (keyed) state - though it doesn’t solve the “I have to iterate over _every_
>> entry” situation.
>>
>> Is this currently the best option?
>>
>
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>
>

Re: Iterating over state entries

Posted by Ken Krugler <kk...@transpac.com>.
Hi Till,

> On Feb 19, 2018, at 8:14 AM, Till Rohrmann <tr...@apache.org> wrote:
> 
> Hi Ken,
> 
> just for my clarification, the `RocksDBMapState#entries` method does not satisfy your requirements? This method does not allow you to iterate across different keys of your keyed stream of course. But it should allow you to iterate over the different entries for a given key of your keyed stream.

As per my email to Fabian, I should have been more precise in my requirements.

I need to do incremental iteration of the entries, versus a complete iteration.

And I'm assuming I can't keep the iterator around across calls to the function.

Regards,

— Ken


> On Mon, Feb 19, 2018 at 12:10 AM, Ken Krugler <kkrugler_lists@transpac.com <ma...@transpac.com>> wrote:
> Hi there,
> 
> I’ve got a MapState where I need to iterate over the entries.
> 
> This currently isn’t supported (at least for Rocks DB), AFAIK, though there is an issue/PR <https://issues.apache.org/jira/browse/FLINK-8297> to improve this.
> 
> The best solution I’ve seen is what Fabian proposed, which involves keeping a ValueState with a count of entries, and then having the key for the MapState be the index.
> 
>> I cannot comment on the internal design, but you could put the data into a
>> RocksDBStateBackend MapState<Integer, X> where the value X is your data
>> type and the key is the list index. You would need another ValueState for
>> the current number of elements that you put into the MapState.
>> A MapState allows to fetch and traverse the key, value, or entry set of the
>> Map without loading it completely into memory.
>> The sets are traversed in sort order of the key, so should be in insertion
>> order (given that you properly increment the list index).
> 
> 
> This effectively lets you iterate over all of the map entries for a given (keyed) state - though it doesn’t solve the “I have to iterate over _every_ entry” situation.
> 
> Is this currently the best option?

--------------------------
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr


Re: Iterating over state entries

Posted by Till Rohrmann <tr...@apache.org>.
Hi Ken,

just for my clarification, the `RocksDBMapState#entries` method does not
satisfy your requirements? This method does not allow you to iterate across
different keys of your keyed stream of course. But it should allow you to
iterate over the different entries for a given key of your keyed stream.

Cheers,
Till

On Mon, Feb 19, 2018 at 12:10 AM, Ken Krugler <kk...@transpac.com>
wrote:

> Hi there,
>
> I’ve got a MapState where I need to iterate over the entries.
>
> This currently isn’t supported (at least for Rocks DB), AFAIK, though
> there is an issue/PR <https://issues.apache.org/jira/browse/FLINK-8297> to
> improve this.
>
> The best solution I’ve seen is what Fabian proposed, which involves
> keeping a ValueState with a count of entries, and then having the key for
> the MapState be the index.
>
> I cannot comment on the internal design, but you could put the data into a
> RocksDBStateBackend MapState<Integer, X> where the value X is your data
> type and the key is the list index. You would need another ValueState for
> the current number of elements that you put into the MapState.
> A MapState allows to fetch and traverse the key, value, or entry set of the
> Map without loading it completely into memory.
> The sets are traversed in sort order of the key, so should be in insertion
> order (given that you properly increment the list index).
>
>
> This effectively lets you iterate over all of the map entries for a given
> (keyed) state - though it doesn’t solve the “I have to iterate over _every_
> entry” situation.
>
> Is this currently the best option?
>
> Thanks,
>
> — Ken
>
> --------------------------------------------
> http://about.me/kkrugler
> +1 530-210-6378 <(530)%20210-6378>
>
>

Re: Iterating over state entries

Posted by Ken Krugler <kk...@transpac.com>.
Hi Fabian,

> I'd like to clarify what I said before.
> 
> By using MapState mainly gain two things:
> - position access by index
> - the full list does not need to be deserialized to read values (which is how ListState works).
> 
> Point access should obviously done by get(index). 
> However, iterating over the list should be done by iterating over the entry (or value) set. The entry set iterator will prefetch multiple entries and only deserialize the key / values when you access them. This reduces the number of RocksDB look-ups.

Sorry, I should have been more precise in my description below. I have to do incremental iteration (e.g. process the next 10 entries).

I’m assuming I can’t hold onto the iterator across calls to a function, right?

If so, then making get(index) calls via the technique described below is currently the most efficient approach, yes?

Thanks,

— Ken


> 2018-02-19 0:10 GMT+01:00 Ken Krugler <kkrugler_lists@transpac.com <ma...@transpac.com>>:
> Hi there,
> 
> I’ve got a MapState where I need to iterate over the entries.
> 
> This currently isn’t supported (at least for Rocks DB), AFAIK, though there is an issue/PR <https://issues.apache.org/jira/browse/FLINK-8297> to improve this.
> 
> The best solution I’ve seen is what Fabian proposed, which involves keeping a ValueState with a count of entries, and then having the key for the MapState be the index.
> 
>> I cannot comment on the internal design, but you could put the data into a
>> RocksDBStateBackend MapState<Integer, X> where the value X is your data
>> type and the key is the list index. You would need another ValueState for
>> the current number of elements that you put into the MapState.
>> A MapState allows to fetch and traverse the key, value, or entry set of the
>> Map without loading it completely into memory.
>> The sets are traversed in sort order of the key, so should be in insertion
>> order (given that you properly increment the list index).
> 
> 
> This effectively lets you iterate over all of the map entries for a given (keyed) state - though it doesn’t solve the “I have to iterate over _every_ entry” situation.
> 
> Is this currently the best option?
> 
> Thanks,
> 
> — Ken
> 
> --------------------------------------------
> http://about.me/kkrugler <http://about.me/kkrugler>
> +1 530-210-6378 <tel:(530)%20210-6378>
> 

--------------------------------------------
http://about.me/kkrugler
+1 530-210-6378


Re: Iterating over state entries

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Ken,

I'd like to clarify what I said before.

By using MapState mainly gain two things:
- position access by index
- the full list does not need to be deserialized to read values (which is
how ListState works).

Point access should obviously done by get(index).
However, iterating over the list should be done by iterating over the entry
(or value) set. The entry set iterator will prefetch multiple entries and
only deserialize the key / values when you access them. This reduces the
number of RocksDB look-ups.

Best,
Fabian


2018-02-19 0:10 GMT+01:00 Ken Krugler <kk...@transpac.com>:

> Hi there,
>
> I’ve got a MapState where I need to iterate over the entries.
>
> This currently isn’t supported (at least for Rocks DB), AFAIK, though
> there is an issue/PR <https://issues.apache.org/jira/browse/FLINK-8297> to
> improve this.
>
> The best solution I’ve seen is what Fabian proposed, which involves
> keeping a ValueState with a count of entries, and then having the key for
> the MapState be the index.
>
> I cannot comment on the internal design, but you could put the data into a
> RocksDBStateBackend MapState<Integer, X> where the value X is your data
> type and the key is the list index. You would need another ValueState for
> the current number of elements that you put into the MapState.
> A MapState allows to fetch and traverse the key, value, or entry set of the
> Map without loading it completely into memory.
> The sets are traversed in sort order of the key, so should be in insertion
> order (given that you properly increment the list index).
>
>
> This effectively lets you iterate over all of the map entries for a given
> (keyed) state - though it doesn’t solve the “I have to iterate over _every_
> entry” situation.
>
> Is this currently the best option?
>
> Thanks,
>
> — Ken
>
> --------------------------------------------
> http://about.me/kkrugler
> +1 530-210-6378 <(530)%20210-6378>
>
>