You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Ken Krugler <kk...@transpac.com> on 2022/11/16 22:27:55 UTC

Stack Overflow Question re compressing broadcast state

Hi all,

Just posted this question on SO: How to enable compression for Flink broadcast state checkpoints <https://stackoverflow.com/q/74466988/231762?sem=2>
Basically it doesn’t look like broadcast state respects the compressed state (checkpoints/savepoints) setting, but I might be reading the code wrong.

Hoping someone (like Dawid Wysakowicz) can chime in here, thanks!

— Ken

--------------------------
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch


Re: Stack Overflow Question re compressing broadcast state

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hey,

Would be great if you could help with any of that. I could definitely 
help with reviews.

Best,

Dawid

On 22/11/2022 01:17, Ken Krugler wrote:
> Hi Dawid,
>
> Thanks for following up on this.
>
> Let me know if you’d like me to update the documentation, that seems 
> pretty straightforward :)
>
> Adding support for compressing broadcast state feels like something 
> more challenging, but I could take a swing at it if you’d like.
>
> Regards,
>
> — Ken
>
> PS - re the key serializer, I was looking at a hack where I keep 
> around the previous record so I could do delta encoding…but that’s 
> also fragile.
>
>> On Nov 21, 2022, at 1:36 AM, Dawid Wysakowicz 
>> <dw...@apache.org> wrote:
>>
>>     And yes, I read "Compression works on the granularity of key-groups in keyed state” as meaning “When compressing keyed state, it’s done per key-group” and not “Compression only works on keyed state” :)
>>
>> Totally agree. Docs could definitely be more straightforward about 
>> that. I created FLINK-30112[1] to improve that.
>>
>>     I agree that "KeyedState should be preferred in majority of cases”.  Unfortunately for a broadcast stream there’s option to used keyed state, right?
>>
>>
>> Absolutely, I mentioned this just an excuse why we did not invest 
>> much in OperatorState.
>>
>>     So assuming that’s the current situation, and I’m not in a position to have my client deploy a patched version of Flink, which of the following (sketchy) ideas has any potential here…
>>
>>
>> I am afraid it won't be easy to hack the compression in.
>>
>> Your suggestion 2. won't help imo, as the compression would compress 
>> separately for each entry. I believe that's not what you're looking for.
>>
>> The option 1. has more potential, as it would apply compression for 
>> the entire state. As you said though, this would require class 
>> overloading which is always fragile.
>>
>> To be honest, I can't think of a better way atm.
>>
>> Btw, I believe being able to apply compression for operator state is 
>> a valid request so I created FLINK-30113[2]
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-30112
>>
>> [2] https://issues.apache.org/jira/browse/FLINK-30113
>>
>>
>> On 18/11/2022 02:13, Ken Krugler wrote:
>>> Hi Dawid,
>>>
>>> Thanks for getting back to me.
>>>
>>> And yes, I read "Compression works on the granularity of key-groups in keyed state” as meaning “When compressing keyed state, it’s done per key-group” and not “Compression only works on keyed state” :)
>>>
>>> I agree that "KeyedState should be preferred in majority of cases”.  Unfortunately for a broadcast stream there’s option to used keyed state, right?
>>>
>>> So assuming that’s the current situation, and I’m not in a position to have my client deploy a patched version of Flink, which of the following (sketchy) ideas has any potential here…
>>>
>>> 1. Implement a version of HeapBroadcastState that compresses the state, and rely on Flink’s classloader finding it in my jar first.
>>>
>>> 2. Register a custom compressing serializer for my state’s key class, assuming that will get picked up by the call to stateMetaInfo.getKeySerializer().
>>>
>>> Or something else?
>>>
>>> Thanks!
>>>
>>> — Ken
>>>
>>>
>>>> On Nov 17, 2022, at 12:06 AM, Dawid Wysakowicz<dw...@apache.org>  wrote:
>>>>
>>>> Cross posting answer from SO:
>>>>
>>>> BroadcastState is an operator state not a KeyedState. The referenced docs refer to a KeyedState:
>>>>
>>>> Compression works on the granularity of key-groups in keyed state,
>>>>
>>>> Probably docs could be more explicit about this behaviour.
>>>>
>>>> Unfortunately as far as I know there is no compression for OperatorState. I am not 100% sure, but I believe it's just has never been implemented, because we did not want to invest in it, as KeyedState should be preferred in majority of cases.
>>>>
>>>> Best,
>>>>
>>>> Dawid
>>>>
>>>> On 16/11/2022 23:27, Ken Krugler wrote:
>>>>> Hi all,
>>>>>
>>>>> Just posted this question on SO: How to enable compression for Flink broadcast state checkpoints<https://stackoverflow.com/q/74466988/231762?sem=2>
>>>>> Basically it doesn’t look like broadcast state respects the compressed state (checkpoints/savepoints) setting, but I might be reading the code wrong.
>>>>>
>>>>> Hoping someone (like Dawid Wysakowicz) can chime in here, thanks!
>>>>>
>>>>> — Ken
>>>>>
>>>>> --------------------------
>>>>> Ken Krugler
>>>>> http://www.scaleunlimited.com  <http://www.scaleunlimited.com/>
>>>>> Custom big data solutions
>>>>> Flink, Pinot, Solr, Elasticsearch
>>>>>
>>>> <OpenPGP_0x31D2DD10BFC15A2D.asc>
>>> --------------------------
>>> Ken Krugler
>>> http://www.scaleunlimited.com
>>> Custom big data solutions
>>> Flink, Pinot, Solr, Elasticsearch
>>>
>>>
>>>
>>>
>> <OpenPGP_0x31D2DD10BFC15A2D.asc>
>
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
>
>
>

Re: Stack Overflow Question re compressing broadcast state

Posted by Ken Krugler <kk...@transpac.com>.
Hi Dawid,

Thanks for following up on this.

Let me know if you’d like me to update the documentation, that seems pretty straightforward :)

Adding support for compressing broadcast state feels like something more challenging, but I could take a swing at it if you’d like.

Regards,

— Ken

PS - re the key serializer, I was looking at a hack where I keep around the previous record so I could do delta encoding…but that’s also fragile.

> On Nov 21, 2022, at 1:36 AM, Dawid Wysakowicz <dw...@apache.org> wrote:
> 
> And yes, I read "Compression works on the granularity of key-groups in keyed state” as meaning “When compressing keyed state, it’s done per key-group” and not “Compression only works on keyed state” :)
> 
> Totally agree. Docs could definitely be more straightforward about that. I created FLINK-30112[1] to improve that.
> 
> I agree that "KeyedState should be preferred in majority of cases”.  Unfortunately for a broadcast stream there’s option to used keyed state, right?
> 
> Absolutely, I mentioned this just an excuse why we did not invest much in OperatorState.
> 
> So assuming that’s the current situation, and I’m not in a position to have my client deploy a patched version of Flink, which of the following (sketchy) ideas has any potential here…
> 
> I am afraid it won't be easy to hack the compression in. 
> 
> Your suggestion 2. won't help imo, as the compression would compress separately for each entry. I believe that's not what you're looking for. 
> 
> The option 1. has more potential, as it would apply compression for the entire state. As you said though, this would require class overloading which is always fragile.
> 
> To be honest, I can't think of a better way atm.
> 
> Btw, I believe being able to apply compression for operator state is a valid request so I created FLINK-30113[2]
> 
> [1] https://issues.apache.org/jira/browse/FLINK-30112 <https://issues.apache.org/jira/browse/FLINK-30112>
> [2] https://issues.apache.org/jira/browse/FLINK-30113 <https://issues.apache.org/jira/browse/FLINK-30113>
> 
> On 18/11/2022 02:13, Ken Krugler wrote:
>> Hi Dawid,
>> 
>> Thanks for getting back to me.
>> 
>> And yes, I read "Compression works on the granularity of key-groups in keyed state” as meaning “When compressing keyed state, it’s done per key-group” and not “Compression only works on keyed state” :)
>> 
>> I agree that "KeyedState should be preferred in majority of cases”.  Unfortunately for a broadcast stream there’s option to used keyed state, right?
>> 
>> So assuming that’s the current situation, and I’m not in a position to have my client deploy a patched version of Flink, which of the following (sketchy) ideas has any potential here…
>> 
>> 1. Implement a version of HeapBroadcastState that compresses the state, and rely on Flink’s classloader finding it in my jar first.
>> 
>> 2. Register a custom compressing serializer for my state’s key class, assuming that will get picked up by the call to stateMetaInfo.getKeySerializer().
>> 
>> Or something else?
>> 
>> Thanks!
>> 
>> — Ken
>> 
>> 
>>> On Nov 17, 2022, at 12:06 AM, Dawid Wysakowicz <dw...@apache.org> <ma...@apache.org> wrote:
>>> 
>>> Cross posting answer from SO:
>>> 
>>> BroadcastState is an operator state not a KeyedState. The referenced docs refer to a KeyedState:
>>> 
>>> Compression works on the granularity of key-groups in keyed state,
>>> 
>>> Probably docs could be more explicit about this behaviour.
>>> 
>>> Unfortunately as far as I know there is no compression for OperatorState. I am not 100% sure, but I believe it's just has never been implemented, because we did not want to invest in it, as KeyedState should be preferred in majority of cases.
>>> 
>>> Best,
>>> 
>>> Dawid
>>> 
>>> On 16/11/2022 23:27, Ken Krugler wrote:
>>>> Hi all,
>>>> 
>>>> Just posted this question on SO: How to enable compression for Flink broadcast state checkpoints <https://stackoverflow.com/q/74466988/231762?sem=2> <https://stackoverflow.com/q/74466988/231762?sem=2>
>>>> Basically it doesn’t look like broadcast state respects the compressed state (checkpoints/savepoints) setting, but I might be reading the code wrong.
>>>> 
>>>> Hoping someone (like Dawid Wysakowicz) can chime in here, thanks!
>>>> 
>>>> — Ken
>>>> 
>>>> --------------------------
>>>> Ken Krugler
>>>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/> <http://www.scaleunlimited.com/> <http://www.scaleunlimited.com/>
>>>> Custom big data solutions
>>>> Flink, Pinot, Solr, Elasticsearch
>>>> 
>>> <OpenPGP_0x31D2DD10BFC15A2D.asc>
>> --------------------------
>> Ken Krugler
>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>> Custom big data solutions
>> Flink, Pinot, Solr, Elasticsearch
>> 
>> 
>> 
>> 
> <OpenPGP_0x31D2DD10BFC15A2D.asc>

--------------------------
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch




Re: Stack Overflow Question re compressing broadcast state

Posted by Dawid Wysakowicz <dw...@apache.org>.
    And yes, I read "Compression works on the granularity of key-groups in keyed state” as meaning “When compressing keyed state, it’s done per key-group” and not “Compression only works on keyed state” :)

Totally agree. Docs could definitely be more straightforward about that. 
I created FLINK-30112[1] to improve that.

    I agree that "KeyedState should be preferred in majority of cases”.  Unfortunately for a broadcast stream there’s option to used keyed state, right?

Absolutely, I mentioned this just an excuse why we did not invest much 
in OperatorState.

    So assuming that’s the current situation, and I’m not in a position to have my client deploy a patched version of Flink, which of the following (sketchy) ideas has any potential here…

I am afraid it won't be easy to hack the compression in.

Your suggestion 2. won't help imo, as the compression would compress 
separately for each entry. I believe that's not what you're looking for.

The option 1. has more potential, as it would apply compression for the 
entire state. As you said though, this would require class overloading 
which is always fragile.

To be honest, I can't think of a better way atm.

Btw, I believe being able to apply compression for operator state is a 
valid request so I created FLINK-30113[2]

[1] https://issues.apache.org/jira/browse/FLINK-30112

[2] https://issues.apache.org/jira/browse/FLINK-30113

On 18/11/2022 02:13, Ken Krugler wrote:
> Hi Dawid,
>
> Thanks for getting back to me.
>
> And yes, I read "Compression works on the granularity of key-groups in keyed state” as meaning “When compressing keyed state, it’s done per key-group” and not “Compression only works on keyed state” :)
>
> I agree that "KeyedState should be preferred in majority of cases”.  Unfortunately for a broadcast stream there’s option to used keyed state, right?
>
> So assuming that’s the current situation, and I’m not in a position to have my client deploy a patched version of Flink, which of the following (sketchy) ideas has any potential here…
>
> 1. Implement a version of HeapBroadcastState that compresses the state, and rely on Flink’s classloader finding it in my jar first.
>
> 2. Register a custom compressing serializer for my state’s key class, assuming that will get picked up by the call to stateMetaInfo.getKeySerializer().
>
> Or something else?
>
> Thanks!
>
> — Ken
>
>
>> On Nov 17, 2022, at 12:06 AM, Dawid Wysakowicz<dw...@apache.org>  wrote:
>>
>> Cross posting answer from SO:
>>
>> BroadcastState is an operator state not a KeyedState. The referenced docs refer to a KeyedState:
>>
>> Compression works on the granularity of key-groups in keyed state,
>>
>> Probably docs could be more explicit about this behaviour.
>>
>> Unfortunately as far as I know there is no compression for OperatorState. I am not 100% sure, but I believe it's just has never been implemented, because we did not want to invest in it, as KeyedState should be preferred in majority of cases.
>>
>> Best,
>>
>> Dawid
>>
>> On 16/11/2022 23:27, Ken Krugler wrote:
>>> Hi all,
>>>
>>> Just posted this question on SO: How to enable compression for Flink broadcast state checkpoints<https://stackoverflow.com/q/74466988/231762?sem=2>
>>> Basically it doesn’t look like broadcast state respects the compressed state (checkpoints/savepoints) setting, but I might be reading the code wrong.
>>>
>>> Hoping someone (like Dawid Wysakowicz) can chime in here, thanks!
>>>
>>> — Ken
>>>
>>> --------------------------
>>> Ken Krugler
>>> http://www.scaleunlimited.com  <http://www.scaleunlimited.com/>
>>> Custom big data solutions
>>> Flink, Pinot, Solr, Elasticsearch
>>>
>> <OpenPGP_0x31D2DD10BFC15A2D.asc>
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
>
>
>
>

Re: Stack Overflow Question re compressing broadcast state

Posted by Ken Krugler <kk...@transpac.com>.
Hi Dawid,

Thanks for getting back to me.

And yes, I read "Compression works on the granularity of key-groups in keyed state” as meaning “When compressing keyed state, it’s done per key-group” and not “Compression only works on keyed state” :)

I agree that "KeyedState should be preferred in majority of cases”.  Unfortunately for a broadcast stream there’s option to used keyed state, right?

So assuming that’s the current situation, and I’m not in a position to have my client deploy a patched version of Flink, which of the following (sketchy) ideas has any potential here…

1. Implement a version of HeapBroadcastState that compresses the state, and rely on Flink’s classloader finding it in my jar first.

2. Register a custom compressing serializer for my state’s key class, assuming that will get picked up by the call to stateMetaInfo.getKeySerializer().

Or something else?

Thanks!

— Ken


> On Nov 17, 2022, at 12:06 AM, Dawid Wysakowicz <dw...@apache.org> wrote:
> 
> Cross posting answer from SO:
> 
> BroadcastState is an operator state not a KeyedState. The referenced docs refer to a KeyedState:
> 
> Compression works on the granularity of key-groups in keyed state,
> 
> Probably docs could be more explicit about this behaviour.
> 
> Unfortunately as far as I know there is no compression for OperatorState. I am not 100% sure, but I believe it's just has never been implemented, because we did not want to invest in it, as KeyedState should be preferred in majority of cases.
> 
> Best,
> 
> Dawid
> 
> On 16/11/2022 23:27, Ken Krugler wrote:
>> Hi all,
>> 
>> Just posted this question on SO: How to enable compression for Flink broadcast state checkpoints <https://stackoverflow.com/q/74466988/231762?sem=2>
>> Basically it doesn’t look like broadcast state respects the compressed state (checkpoints/savepoints) setting, but I might be reading the code wrong.
>> 
>> Hoping someone (like Dawid Wysakowicz) can chime in here, thanks!
>> 
>> — Ken
>> 
>> --------------------------
>> Ken Krugler
>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>> Custom big data solutions
>> Flink, Pinot, Solr, Elasticsearch
>> 
> <OpenPGP_0x31D2DD10BFC15A2D.asc>

--------------------------
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch




Re: Stack Overflow Question re compressing broadcast state

Posted by Dawid Wysakowicz <dw...@apache.org>.
Cross posting answer from SO:

|BroadcastState| is an operator state not a |KeyedState|. The referenced 
docs refer to a |KeyedState|:

    Compression works on the granularity of key-groups in keyed state,

Probably docs could be more explicit about this behaviour.

Unfortunately as far as I know there is no compression for 
OperatorState. I am not 100% sure, but I believe it's just has never 
been implemented, because we did not want to invest in it, as 
|KeyedState| should be preferred in majority of cases.

Best,

Dawid

On 16/11/2022 23:27, Ken Krugler wrote:
> Hi all,
>
> Just posted this question on SO: How to enable compression for Flink 
> broadcast state checkpoints 
> <https://stackoverflow.com/q/74466988/231762?sem=2>
>
> Basically it doesn’t look like broadcast state respects the compressed 
> state (checkpoints/savepoints) setting, but I might be reading the 
> code wrong.
>
> Hoping someone (like Dawid Wysakowicz) can chime in here, thanks!
>
> — Ken
>
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
>