You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Avi Levi <av...@bluevoyant.com> on 2018/11/21 11:36:22 UTC

your advice please regarding state

Hi ,
I am very new to flink so please be gentle :)

*The challenge:*
I have a road sensor that should scan billons of cars per day. for starter
I want to recognise if each car that passes by is new or not. new cars
(never been seen before by that sensor ) will be placed on a different
topic on kafka than the other (total of two topics for new and old) .
 under the assumption that the state will contain billions of unique car
ids.

*Suggested Solutions*
My question is it which approach is better.
Both approaches using RocksDB

1. use the ValueState and to split the steam like
  *val domainsSrc = env*
*    .addSource(consumer)*
*    .keyBy(car => car.id <http://car.id>)*
*    .map(...)*
and checking if the state value is null to recognise new cars. if new than
I will update the state
how will the persistent data will be shard among the nodes in the cluster
(let's say that I have 10 nodes) ?

2. use MapState and to partition the stream to groups by some arbitrary
factor e.g
*val domainsSrc = env*
*    .addSource(consumer)*
*    .keyBy{ car =>*
*        val h car.id.hashCode % partitionFactor*
*        math.abs(h)*
*    } .map(...)*
and to check *mapState.keys.contains(car.id <http://car.id>) *if not - add
it to the state

which approach is better ?

Thanks in advance
Avi

Re: your advice please regarding state

Posted by Avi Levi <av...@bluevoyant.com>.
Thank you very much. got it.

On Tue, Nov 27, 2018 at 12:53 PM Fabian Hueske <fh...@gmail.com> wrote:

> Hi Avi,
>
> I'd definitely go for approach #1.
> Flink will hash partition the records across all nodes. This is basically
> the same as a distributed key-value store sharding keys.
> I would not try to fine tune the partitioning. You should try to use as
> many keys as possible to ensure an even distribution of key. This will also
> allow to scale your application later to more tasks.
>
> Best, Fabian
>
> Am Di., 27. Nov. 2018 um 05:21 Uhr schrieb yinhua.dai <
> yinhua.2018@outlook.com>:
>
>> General approach#1 is ok, but you may have to use some hash based key
>> selector if you have a heavy data skew.
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>

Re: your advice please regarding state

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Avi,

I'd definitely go for approach #1.
Flink will hash partition the records across all nodes. This is basically
the same as a distributed key-value store sharding keys.
I would not try to fine tune the partitioning. You should try to use as
many keys as possible to ensure an even distribution of key. This will also
allow to scale your application later to more tasks.

Best, Fabian

Am Di., 27. Nov. 2018 um 05:21 Uhr schrieb yinhua.dai <
yinhua.2018@outlook.com>:

> General approach#1 is ok, but you may have to use some hash based key
> selector if you have a heavy data skew.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: your advice please regarding state

Posted by "yinhua.dai" <yi...@outlook.com>.
General approach#1 is ok, but you may have to use some hash based key
selector if you have a heavy data skew.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: your advice please regarding state

Posted by Avi Levi <av...@bluevoyant.com>.
Thanks a lot!  got it :)

On Wed, Nov 21, 2018 at 11:40 PM Jamie Grier <jg...@lyft.com> wrote:

> Hi Avi,
>
> The typical approach would be as you've described in #1.  #2 is not
> necessary -- #1 is already doing basically exactly that.
>
> -Jamie
>
>
> On Wed, Nov 21, 2018 at 3:36 AM Avi Levi <av...@bluevoyant.com> wrote:
>
>> Hi ,
>> I am very new to flink so please be gentle :)
>>
>> *The challenge:*
>> I have a road sensor that should scan billons of cars per day. for
>> starter I want to recognise if each car that passes by is new or not. new
>> cars (never been seen before by that sensor ) will be placed on a different
>> topic on kafka than the other (total of two topics for new and old) .
>>  under the assumption that the state will contain billions of unique car
>> ids.
>>
>> *Suggested Solutions*
>> My question is it which approach is better.
>> Both approaches using RocksDB
>>
>> 1. use the ValueState and to split the steam like
>>   *val domainsSrc = env*
>> *    .addSource(consumer)*
>> *    .keyBy(car => car.id <http://car.id>)*
>> *    .map(...)*
>> and checking if the state value is null to recognise new cars. if new
>> than I will update the state
>> how will the persistent data will be shard among the nodes in the cluster
>> (let's say that I have 10 nodes) ?
>>
>> 2. use MapState and to partition the stream to groups by some arbitrary
>> factor e.g
>> *val domainsSrc = env*
>> *    .addSource(consumer)*
>> *    .keyBy{ car =>*
>> *        val h car.id.hashCode % partitionFactor*
>> *        math.abs(h)*
>> *    } .map(...)*
>> and to check *mapState.keys.contains(car.id <http://car.id>) *if not -
>> add it to the state
>>
>> which approach is better ?
>>
>> Thanks in advance
>> Avi
>>
>

Re: your advice please regarding state

Posted by Jamie Grier <jg...@lyft.com>.
Hi Avi,

The typical approach would be as you've described in #1.  #2 is not
necessary -- #1 is already doing basically exactly that.

-Jamie


On Wed, Nov 21, 2018 at 3:36 AM Avi Levi <av...@bluevoyant.com> wrote:

> Hi ,
> I am very new to flink so please be gentle :)
>
> *The challenge:*
> I have a road sensor that should scan billons of cars per day. for starter
> I want to recognise if each car that passes by is new or not. new cars
> (never been seen before by that sensor ) will be placed on a different
> topic on kafka than the other (total of two topics for new and old) .
>  under the assumption that the state will contain billions of unique car
> ids.
>
> *Suggested Solutions*
> My question is it which approach is better.
> Both approaches using RocksDB
>
> 1. use the ValueState and to split the steam like
>   *val domainsSrc = env*
> *    .addSource(consumer)*
> *    .keyBy(car => car.id <http://car.id>)*
> *    .map(...)*
> and checking if the state value is null to recognise new cars. if new than
> I will update the state
> how will the persistent data will be shard among the nodes in the cluster
> (let's say that I have 10 nodes) ?
>
> 2. use MapState and to partition the stream to groups by some arbitrary
> factor e.g
> *val domainsSrc = env*
> *    .addSource(consumer)*
> *    .keyBy{ car =>*
> *        val h car.id.hashCode % partitionFactor*
> *        math.abs(h)*
> *    } .map(...)*
> and to check *mapState.keys.contains(car.id <http://car.id>) *if not -
> add it to the state
>
> which approach is better ?
>
> Thanks in advance
> Avi
>