You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Elias Levy <fe...@gmail.com> on 2017/08/21 20:23:09 UTC

Global State and Scaling

I am implementing a control stream.  The stream communicates a global
configuration value for the whole job.  It uses DataStream.broadcast() to
communicate this to all parallel operator instances.  I would like to save
this value in state so that it can be recovered when the job
restarts/recovers.  The control stream is not keyed, so the only option is
Operator state.

I could implement this using the ListCheckpointed interface, returning
Collections.singletonList(configValue) from snapshotState.  It is clear
what I'd need to do in restoreState in the case of scale in.  If I include
a serial number in the config, and it receives multiple values on restore,
it can keep the config value with the largest serial number, indicating the
latest config.

Alas, it is not clear what should happen on scale out, as some operator
instances will receive empty lists.

It seems the other alternative is to use CheckpointedFunction, along with
union redistribution via getUnionListState, and then have each operator
instance select from the union list the config with the latest serial
number, of which they should be multiple copies.  But this seem like an
ugly hack.


In addition, the documentation is unclear on the relationship and effect,
if any, of the maximum parallelism Flink job parameter on operator state,
where as it is much clearer on this regard as it related to keyed state via
key groups.


How are folks handling this use case, i.e. storing and restoring global
config values via Flink state?

Re: Global State and Scaling

Posted by Till Rohrmann <tr...@apache.org>.
Hi Elias,

you're right, we currently don't support proper broadcast state. Hope to
add support for this in the near future.

The maximum parallelism only affects the keyed state because it defines how
many key groups there are. The key groups are the smallest unit of state
which can be re-partitioned (e.g. due to scaling up/down).

Cheers,
Till

On Tue, Aug 22, 2017 at 3:02 AM, Elias Levy <fe...@gmail.com>
wrote:

> Looks like Gerard asked something along similar lines
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Managed-operator-state-treating-state-of-all-parallel-operators-as-the-same-td14102.html>
> just last month and that there is a JIRA
> <https://issues.apache.org/jira/browse/FLINK-4940> for official support
> for broadcast state.  Looks like the ugly hack is the way to go for now.
>
>
> On Mon, Aug 21, 2017 at 1:23 PM, Elias Levy <fe...@gmail.com>
> wrote:
>
>> I am implementing a control stream.  The stream communicates a global
>> configuration value for the whole job.  It uses DataStream.broadcast() to
>> communicate this to all parallel operator instances.  I would like to save
>> this value in state so that it can be recovered when the job
>> restarts/recovers.  The control stream is not keyed, so the only option is
>> Operator state.
>>
>> I could implement this using the ListCheckpointed interface, returning
>> Collections.singletonList(configValue) from snapshotState.  It is clear
>> what I'd need to do in restoreState in the case of scale in.  If I include
>> a serial number in the config, and it receives multiple values on restore,
>> it can keep the config value with the largest serial number, indicating the
>> latest config.
>>
>> Alas, it is not clear what should happen on scale out, as some operator
>> instances will receive empty lists.
>>
>> It seems the other alternative is to use CheckpointedFunction, along with
>> union redistribution via getUnionListState, and then have each operator
>> instance select from the union list the config with the latest serial
>> number, of which they should be multiple copies.  But this seem like an
>> ugly hack.
>>
>>
>> In addition, the documentation is unclear on the relationship and effect,
>> if any, of the maximum parallelism Flink job parameter on operator state,
>> where as it is much clearer on this regard as it related to keyed state via
>> key groups.
>>
>>
>> How are folks handling this use case, i.e. storing and restoring global
>> config values via Flink state?
>>
>>
>

Re: Global State and Scaling

Posted by Elias Levy <fe...@gmail.com>.
Looks like Gerard asked something along similar lines
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Managed-operator-state-treating-state-of-all-parallel-operators-as-the-same-td14102.html>
just last month and that there is a JIRA
<https://issues.apache.org/jira/browse/FLINK-4940> for official support for
broadcast state.  Looks like the ugly hack is the way to go for now.


On Mon, Aug 21, 2017 at 1:23 PM, Elias Levy <fe...@gmail.com>
wrote:

> I am implementing a control stream.  The stream communicates a global
> configuration value for the whole job.  It uses DataStream.broadcast() to
> communicate this to all parallel operator instances.  I would like to save
> this value in state so that it can be recovered when the job
> restarts/recovers.  The control stream is not keyed, so the only option is
> Operator state.
>
> I could implement this using the ListCheckpointed interface, returning
> Collections.singletonList(configValue) from snapshotState.  It is clear
> what I'd need to do in restoreState in the case of scale in.  If I include
> a serial number in the config, and it receives multiple values on restore,
> it can keep the config value with the largest serial number, indicating the
> latest config.
>
> Alas, it is not clear what should happen on scale out, as some operator
> instances will receive empty lists.
>
> It seems the other alternative is to use CheckpointedFunction, along with
> union redistribution via getUnionListState, and then have each operator
> instance select from the union list the config with the latest serial
> number, of which they should be multiple copies.  But this seem like an
> ugly hack.
>
>
> In addition, the documentation is unclear on the relationship and effect,
> if any, of the maximum parallelism Flink job parameter on operator state,
> where as it is much clearer on this regard as it related to keyed state via
> key groups.
>
>
> How are folks handling this use case, i.e. storing and restoring global
> config values via Flink state?
>
>