You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by jelmer <jk...@gmail.com> on 2018/05/07 12:29:59 UTC

pre-initializing global window state

Hi I am looking for some advice on how to solve the following problem

I'd like to keep track of the all time last n events received for a user.
An event on average takes up 500 bytes and here will be ten's of millions
of users for which we need to keep this information. The list of events
will be stored in cassandra for serving by an api

One way I can think of to implement this , is to use a global window per
user with a count evictor.

The problem I see with this is that the state would forever remain on the
worker nodes, in our case, in rocks db.

This means there would be a *lot* of state to include for savepoints and
checkpoints. This would make such a job very unwieldy to operate.

Is it possible to evict state from global state after some period of
inactivity. and then reinitalize the global state with data loaded from
cassandra when a new event arrives ?

Or is there an obvious better way to tackle this problem that i am missing

Any pointers would be greatly appreciated

Re: pre-initializing global window state

Posted by jelmer <jk...@gmail.com>.
Hi Ken

> 1. I would first try using RockDB with incremental checkpointing
<https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html>,
before deciding that an alternative approach is required.

That would reduce the size of the checkpoints but as far as I know not the
savepoints. If i understand correctly that is still a copy of the entire
system state. So deploying a new version of the application will be a
daunting proposition involving  saving many gigabytes of data to external
storage. and restarts that will take a very long time. Also our flink
workers are not really scoped to these kind of storage requirements

> 2 Have you considered using queryable state vs. also keeping the list of
events in Cassandra?

We looked at it before and at the time it was still somewhat experimental
and somewhat immature with regards to handling failure scenarios. And it
would require all the state to reside in flink. Which would again lead to
long restarts when creating savepoints

> 3. Depending on what you need the list of events for, often you can apply
a streaming algorithm to get good-enough (approximate) results without
storing complete state.

Hyperloglog etc makes a lot of sense in many scenario's, but unfortunately
this is not one of them. :-(



Another alternative I thought of for this problem is to  sidestep the
window abstraction and fall back to a processing function with timers


On 7 May 2018 at 17:31, Ken Krugler <kk...@transpac.com> wrote:

> Hi Jelmer,
>
> Three comments, if I understand your use case correctly…
>
> 1. I would first try using RockDB with incremental checkpointing
> <https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html>,
> before deciding that an alternative approach is required.
>
> 2. Have you considered using queryable state
> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/queryable_state.html> vs.
> also keeping the list of events in Cassandra?
>
> 3. Depending on what you need the list of events for, often you can apply
> a streaming algorithm to get good-enough (approximate) results without
> storing complete state.
>
> — Ken
>
>
> On May 7, 2018, at 5:29 AM, jelmer <jk...@gmail.com> wrote:
>
> Hi I am looking for some advice on how to solve the following problem
>
> I'd like to keep track of the all time last n events received for a user.
> An event on average takes up 500 bytes and here will be ten's of millions
> of users for which we need to keep this information. The list of events
> will be stored in cassandra for serving by an api
>
> One way I can think of to implement this , is to use a global window per
> user with a count evictor.
>
> The problem I see with this is that the state would forever remain on the
> worker nodes, in our case, in rocks db.
>
> This means there would be a *lot* of state to include for savepoints and
> checkpoints. This would make such a job very unwieldy to operate.
>
> Is it possible to evict state from global state after some period of
> inactivity. and then reinitalize the global state with data loaded from
> cassandra when a new event arrives ?
>
> Or is there an obvious better way to tackle this problem that i am missing
>
> Any pointers would be greatly appreciated
>
>
>
> --------------------------------------------
> http://about.me/kkrugler
> +1 530-210-6378
>
>

Re: pre-initializing global window state

Posted by Ken Krugler <kk...@transpac.com>.
Hi Jelmer,

Three comments, if I understand your use case correctly…

1. I would first try using RockDB with incremental checkpointing <https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html>, before deciding that an alternative approach is required.

2. Have you considered using queryable state <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/queryable_state.html> vs. also keeping the list of events in Cassandra?

3. Depending on what you need the list of events for, often you can apply a streaming algorithm to get good-enough (approximate) results without storing complete state.

— Ken


> On May 7, 2018, at 5:29 AM, jelmer <jk...@gmail.com> wrote:
> 
> Hi I am looking for some advice on how to solve the following problem
> 
> I'd like to keep track of the all time last n events received for a user.
> An event on average takes up 500 bytes and here will be ten's of millions of users for which we need to keep this information. The list of events will be stored in cassandra for serving by an api
> 
> One way I can think of to implement this , is to use a global window per user with a count evictor. 
> 
> The problem I see with this is that the state would forever remain on the worker nodes, in our case, in rocks db.
> 
> This means there would be a *lot* of state to include for savepoints and checkpoints. This would make such a job very unwieldy to operate.
> 
> Is it possible to evict state from global state after some period of inactivity. and then reinitalize the global state with data loaded from cassandra when a new event arrives ? 
> 
> Or is there an obvious better way to tackle this problem that i am missing
> 
> Any pointers would be greatly appreciated
> 
> 

--------------------------------------------
http://about.me/kkrugler
+1 530-210-6378