You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Tony Wei <to...@gmail.com> on 2017/09/19 13:58:28 UTC

How to use operator list state like a HashMap?

Hi,

I have a basic streaming job that continuously persist data from Kafka to
S3.
Those data would be grouped by some dimensions and a limited amount.

Originally, I used 'keyBy' and key state to fulfill the requirement.
However, because the data is extremely skewed, I turned to use map function
to aggregate data for some partitions only, so that I can balance the
amount of data in each sub tasks.

I used a HashMap to store data by different dimensions inner map function
and convert it to operator list state when 'snapshot()' is called.
But, that makes another problem. Because I can't access operator list state
directly like using key state in KeyedStream, I have to use heap space to
store those state. It leads to the limitation of the amount that I can
cache in map function.

I was wondering if there is any good suggestion to deal with this problem
or how to use operator list state like this scenario with a better manner.
Thank you.


Best Regards,
Tony Wei

Re: How to use operator list state like a HashMap?

Posted by Tony Wei <to...@gmail.com>.
Hi Fabian,

This is a good advice, but I had already tried adding random value to my
data and it seems not very useful.

The key set of my data is small, around 10 ~ 20. If the range of random
number is small, the distribution might not be better, even worse. I think
the reason is that KeyedStream uses murmur hash to partition key and it
wouldn't guarantee the distribution is fair.
Of course if the range of random number is large enough, the probability of
even distribution is higher. It means I need to cache more data in state
because the data with the original key would be separated to a larger key
set. I would prefer to avoid this situation.

Best Regards,
Tony Wei

2017-09-19 22:56 GMT+08:00 Fabian Hueske <fh...@gmail.com>:

> Hi Tony,
>
> operator state can only be kept on the heap.
>
> One thing you could try is to add a random value to you data and keyBy on
> a composite key that consists of your original key and the random value.
> It is important though, that you actually add the random value to your
> data to ensure that the extracted key is always the same, i.e.,
> deterministic with respect to the data.
> This should evenly distribute your data and allow you to use keyed
> MapState.
>
> Hope this helps,
> Fabian
>
> 2017-09-19 15:58 GMT+02:00 Tony Wei <to...@gmail.com>:
>
>> Hi,
>>
>> I have a basic streaming job that continuously persist data from Kafka to
>> S3.
>> Those data would be grouped by some dimensions and a limited amount.
>>
>> Originally, I used 'keyBy' and key state to fulfill the requirement.
>> However, because the data is extremely skewed, I turned to use map
>> function to aggregate data for some partitions only, so that I can balance
>> the amount of data in each sub tasks.
>>
>> I used a HashMap to store data by different dimensions inner map function
>> and convert it to operator list state when 'snapshot()' is called.
>> But, that makes another problem. Because I can't access operator list
>> state directly like using key state in KeyedStream, I have to use heap
>> space to store those state. It leads to the limitation of the amount that I
>> can cache in map function.
>>
>> I was wondering if there is any good suggestion to deal with this problem
>> or how to use operator list state like this scenario with a better manner.
>> Thank you.
>>
>>
>> Best Regards,
>> Tony Wei
>>
>
>

Re: How to use operator list state like a HashMap?

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Tony,

operator state can only be kept on the heap.

One thing you could try is to add a random value to you data and keyBy on a
composite key that consists of your original key and the random value.
It is important though, that you actually add the random value to your data
to ensure that the extracted key is always the same, i.e., deterministic
with respect to the data.
This should evenly distribute your data and allow you to use keyed MapState.

Hope this helps,
Fabian

2017-09-19 15:58 GMT+02:00 Tony Wei <to...@gmail.com>:

> Hi,
>
> I have a basic streaming job that continuously persist data from Kafka to
> S3.
> Those data would be grouped by some dimensions and a limited amount.
>
> Originally, I used 'keyBy' and key state to fulfill the requirement.
> However, because the data is extremely skewed, I turned to use map
> function to aggregate data for some partitions only, so that I can balance
> the amount of data in each sub tasks.
>
> I used a HashMap to store data by different dimensions inner map function
> and convert it to operator list state when 'snapshot()' is called.
> But, that makes another problem. Because I can't access operator list
> state directly like using key state in KeyedStream, I have to use heap
> space to store those state. It leads to the limitation of the amount that I
> can cache in map function.
>
> I was wondering if there is any good suggestion to deal with this problem
> or how to use operator list state like this scenario with a better manner.
> Thank you.
>
>
> Best Regards,
> Tony Wei
>