You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ken Krugler <kk...@transpac.com> on 2017/04/27 01:02:47 UTC

Iterating over keys in state backend

Is there a way to iterate over all of the key/value entries in the state backend, from within the operator that’s making use of the same?

E.g. I’ve got a ReducingState, and on a timed interval (inside of the onTimer method) I need to iterate over all KV state and emit the N “best” entries.

What’s the recommended approach?

Thanks,

— Ken


Re: Iterating over keys in state backend

Posted by Ken Krugler <kk...@transpac.com>.
Hi Kostas,

In my use case I’m keeping track of the state of URLs during a web crawl.

This represents both current state (“URL X should be crawled at time Y, and has an estimated value of Z), and is the source of URLs to be fed into the crawl infrastructure - it’s a floor wax and a dessert topping.

Which is why it’s a process function, so that I can query this “crawlDB” to get URLs to emit to the fetch queue, independent of when/if new URLs are flowing in from some external source.

And yes, I could use an external, queryable system to to handle this (e.g. Elasticsearch), but at a scale of billions of URLs having something custom is of significant value in terms of performance and resource costs.

There are things I could do to better leverage Flink’s state management, so I have to do less in this custom DB (e.g. archiving low-scoring URLs comes to mind).

But after a few whiteboard sessions, it still seems like I’m going to have to add checkpointing/snapshotting support to my custom crawlDB.

Thanks,

— Ken


> On Apr 28, 2017, at 1:28am, Kostas Kloudas <k....@data-artisans.com> wrote:
> 
> Hi Ken,
> 
> So you have a queue where elements are sorted by timestamp and score, and when the time (event time I suppose) passes 
> that of the timestamp of an element, you want to fetch the element and:
>  if the score is too low you archive it 
>  if the score is OK you emit it.
> 
> If I get it right, then if your stream is keyed you have a queue and an “archive” state per key, 
> if not, you have a global queue for all elements, which can be seen as a keyed stream on a dummy key, right?
> By the way, timers in Flink have to be associated with a key, so I suppose that if you are using timers you are in the first case (keyed stream).
> 
> In this case, why do you need access to the state of all the keys?
> 
> Also it may be worth having a look at the CEP operator in the Flink codebase.
> There you also have a queue per key, where events are sorted by timestamp, and at each watermark, 
> elements with timestamps smaller than the watermark are processed.
> 
> Hope this helps,
> Kostas
> 
>> On Apr 28, 2017, at 4:08 AM, Ken Krugler <kkrugler_lists@transpac.com <ma...@transpac.com>> wrote:
>> 
>> Hi Kostas,
>> 
>> Thanks for responding. Details in-line below.
>> 
>>> On Apr 27, 2017, at 1:19am, Kostas Kloudas <k.kloudas@data-artisans.com <ma...@data-artisans.com>> wrote:
>>> 
>>> Hi Ken,
>>> 
>>> Unfortunately, iterating over all keys is not currently supported.
>>> 
>>> Do you have your own custom operator (because you mention “from within the operator…”) or
>>> you have a process function (because you mention the “onTimer” method)?
>> 
>> Currently it’s a process function, but I might be able to just use a regular operator.
>> 
>>> Also, could you describe your use case a bit more?  You have a periodic timer per key and when
>>> a timer for a given key fires you want to have access to the state of all the keys?
>> 
>> The timer bit is because I’m filling an async queue, and thus need to trigger emitting tuples to the operator’s output stream independent of inbound tuples.
>> The main problems I’m trying to solve (without requiring a separate scalable DB infrastructure) are:
>> 
>>  - entries have an associated “earliest processing time”. I don’t want to send these through the system until that time trigger has passed.
>>  - entries have an associated “score”. I want to favor processing high scoring entries over low scoring entries.
>>  - if an entry’s score is too low, I want to archive it, versus constantly re-evaluate it using the above two factors.
>> 
>> I’ve got my own custom DB that is working for the above, and scales to target sizes of 1B+ entries per server by using a mixture of RAM and disk.
> 
>> But having to checkpoint it isn’t trivial.
>> 
>> So I thought that if there was a way to (occasionally) iterate over the keys in the state backend, I could get what I needed with the minimum effort.
>> 
>> But sounds like that’s not possible currently.
>> 
>> Thanks,
>> 
>> — Ken
>> 
>> 
>> 
>>>> On Apr 27, 2017, at 3:02 AM, Ken Krugler <kkrugler_lists@transpac.com <ma...@transpac.com>> wrote:
>>>> 
>>>> Is there a way to iterate over all of the key/value entries in the state backend, from within the operator that’s making use of the same?
>>>> 
>>>> E.g. I’ve got a ReducingState, and on a timed interval (inside of the onTimer method) I need to iterate over all KV state and emit the N “best” entries.
>>>> 
>>>> What’s the recommended approach?
>>>> 
>>>> Thanks,
>>>> 
>>>> — Ken
>>>> 
>>> 
>> 
>> --------------------------
>> Ken Krugler
>> +1 530-210-6378
>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>> custom big data solutions & training
>> Hadoop, Cascading, Cassandra & Solr
> 

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr




Re: Iterating over keys in state backend

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Ken,

So you have a queue where elements are sorted by timestamp and score, and when the time (event time I suppose) passes 
that of the timestamp of an element, you want to fetch the element and:
 if the score is too low you archive it 
 if the score is OK you emit it.

If I get it right, then if your stream is keyed you have a queue and an “archive” state per key, 
if not, you have a global queue for all elements, which can be seen as a keyed stream on a dummy key, right?
By the way, timers in Flink have to be associated with a key, so I suppose that if you are using timers you are in the first case (keyed stream).

In this case, why do you need access to the state of all the keys?

Also it may be worth having a look at the CEP operator in the Flink codebase.
There you also have a queue per key, where events are sorted by timestamp, and at each watermark, 
elements with timestamps smaller than the watermark are processed.

Hope this helps,
Kostas

> On Apr 28, 2017, at 4:08 AM, Ken Krugler <kk...@transpac.com> wrote:
> 
> Hi Kostas,
> 
> Thanks for responding. Details in-line below.
> 
>> On Apr 27, 2017, at 1:19am, Kostas Kloudas <k.kloudas@data-artisans.com <ma...@data-artisans.com>> wrote:
>> 
>> Hi Ken,
>> 
>> Unfortunately, iterating over all keys is not currently supported.
>> 
>> Do you have your own custom operator (because you mention “from within the operator…”) or
>> you have a process function (because you mention the “onTimer” method)?
> 
> Currently it’s a process function, but I might be able to just use a regular operator.
> 
>> Also, could you describe your use case a bit more?  You have a periodic timer per key and when
>> a timer for a given key fires you want to have access to the state of all the keys?
> 
> The timer bit is because I’m filling an async queue, and thus need to trigger emitting tuples to the operator’s output stream independent of inbound tuples.
> The main problems I’m trying to solve (without requiring a separate scalable DB infrastructure) are:
> 
>  - entries have an associated “earliest processing time”. I don’t want to send these through the system until that time trigger has passed.
>  - entries have an associated “score”. I want to favor processing high scoring entries over low scoring entries.
>  - if an entry’s score is too low, I want to archive it, versus constantly re-evaluate it using the above two factors.
> 
> I’ve got my own custom DB that is working for the above, and scales to target sizes of 1B+ entries per server by using a mixture of RAM and disk.

> But having to checkpoint it isn’t trivial.
> 
> So I thought that if there was a way to (occasionally) iterate over the keys in the state backend, I could get what I needed with the minimum effort.
> 
> But sounds like that’s not possible currently.
> 
> Thanks,
> 
> — Ken
> 
> 
> 
>>> On Apr 27, 2017, at 3:02 AM, Ken Krugler <kkrugler_lists@transpac.com <ma...@transpac.com>> wrote:
>>> 
>>> Is there a way to iterate over all of the key/value entries in the state backend, from within the operator that’s making use of the same?
>>> 
>>> E.g. I’ve got a ReducingState, and on a timed interval (inside of the onTimer method) I need to iterate over all KV state and emit the N “best” entries.
>>> 
>>> What’s the recommended approach?
>>> 
>>> Thanks,
>>> 
>>> — Ken
>>> 
>> 
> 
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr


Re: Iterating over keys in state backend

Posted by Ken Krugler <kk...@transpac.com>.
Hi Kostas,

Thanks for responding. Details in-line below.

> On Apr 27, 2017, at 1:19am, Kostas Kloudas <k....@data-artisans.com> wrote:
> 
> Hi Ken,
> 
> Unfortunately, iterating over all keys is not currently supported.
> 
> Do you have your own custom operator (because you mention “from within the operator…”) or
> you have a process function (because you mention the “onTimer” method)?

Currently it’s a process function, but I might be able to just use a regular operator.

> Also, could you describe your use case a bit more?  You have a periodic timer per key and when
> a timer for a given key fires you want to have access to the state of all the keys?

The timer bit is because I’m filling an async queue, and thus need to trigger emitting tuples to the operator’s output stream independent of inbound tuples.

The main problems I’m trying to solve (without requiring a separate scalable DB infrastructure) are:

 - entries have an associated “earliest processing time”. I don’t want to send these through the system until that time trigger has passed.
 - entries have an associated “score”. I want to favor processing high scoring entries over low scoring entries.
 - if an entry’s score is too low, I want to archive it, versus constantly re-evaluate it using the above two factors.

I’ve got my own custom DB that is working for the above, and scales to target sizes of 1B+ entries per server by using a mixture of RAM and disk.

But having to checkpoint it isn’t trivial.

So I thought that if there was a way to (occasionally) iterate over the keys in the state backend, I could get what I needed with the minimum effort.

But sounds like that’s not possible currently.

Thanks,

— Ken



>> On Apr 27, 2017, at 3:02 AM, Ken Krugler <kkrugler_lists@transpac.com <ma...@transpac.com>> wrote:
>> 
>> Is there a way to iterate over all of the key/value entries in the state backend, from within the operator that’s making use of the same?
>> 
>> E.g. I’ve got a ReducingState, and on a timed interval (inside of the onTimer method) I need to iterate over all KV state and emit the N “best” entries.
>> 
>> What’s the recommended approach?
>> 
>> Thanks,
>> 
>> — Ken
>> 
> 

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr




Re: Iterating over keys in state backend

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Ken,

Unfortunately, iterating over all keys is not currently supported.

Do you have your own custom operator (because you mention “from within the operator…”) or
you have a process function (because you mention the “onTimer” method)?

Also, could you describe your use case a bit more?  You have a periodic timer per key and when
a timer for a given key fires you want to have access to the state of all the keys?

Thanks,
Kostas

> On Apr 27, 2017, at 3:02 AM, Ken Krugler <kk...@transpac.com> wrote:
> 
> Is there a way to iterate over all of the key/value entries in the state backend, from within the operator that’s making use of the same?
> 
> E.g. I’ve got a ReducingState, and on a timed interval (inside of the onTimer method) I need to iterate over all KV state and emit the N “best” entries.
> 
> What’s the recommended approach?
> 
> Thanks,
> 
> — Ken
>