You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Aljoscha Krettek (JIRA)" <ji...@apache.org> on 2017/04/03 12:34:42 UTC

[jira] [Updated] (FLINK-4940) Add support for broadcast state

     [ https://issues.apache.org/jira/browse/FLINK-4940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Aljoscha Krettek updated FLINK-4940:
------------------------------------
    Component/s:     (was: Streaming)
                 DataStream API

> Add support for broadcast state
> -------------------------------
>
>                 Key: FLINK-4940
>                 URL: https://issues.apache.org/jira/browse/FLINK-4940
>             Project: Flink
>          Issue Type: Sub-task
>          Components: DataStream API
>            Reporter: Aljoscha Krettek
>
> As mentioned in https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API we need broadcast state to support job patterns where one (or several) inputs are broadcast to all operator instances and where we keep state that that is mutated only based on input from broadcast inputs. This special restriction ensures that the broadcast state is the same on all parallel operator instances when checkpointing (except when using at-least-once mode). We therefore only have to checkpoint the state of one arbitrary instance, for example instance 0.
> For the different types of side inputs we need different types of state, luckily, the side input types align with these state types we currently have for keyed state:
>  - {{ValueState}}
>  - {{ListState}}
>  - {{MapState}}
> We can therefore reuse keyed state backends for our purposes but need to put a more restricting API in front of it: mutation of broadcast state must only be allowed when actually processing broadcast input. If we don't have this check users can (by mistake) modify broadcast state. This would lead to incorrect results which are very hard to notice, much less debug.
> With the way the Flink state API works (users can get a {{State}} in {{open()}} and work with state by calling methods on that) we have to add special wrapping state classes that only allow modification of state when processing a broadcast element.
> For the API, I propose to add a new interface `InternalStateAccessor`:
> {code}
> /**
>  * Interface for accessing persistent state.
>  */
> @PublicEvolving
> public interface InternalStateAccessor {
>   <N, S extends State> S state(
> 			N namespace,
> 			TypeSerializer<N> namespaceSerializer,
> 			StateDescriptor<S, ?> stateDescriptor)}
> {code}
> this is the same as `KeyedStateBackend.getPartitionedState()` but allows us to abstract away the special nature of broadcast state. This is also meant as an external interface and is not to be exposed to user functions. Only operators should deal with this.
> {{AbstractStreamOperator}} would get a new method `getBroadcastStateAccessor()` that returns an implementation of this interface. The implementation would have a {{KeyedStateBackend}} but wrap the state in special wrappers that only allow modification when processing broadcast elements (as mentioned above). 
> On the lower implementation levels, we have to add a new entry for our state to `OperatorSnapshotResult`. For example:
> {code}
> private RunnableFuture<KeyGroupsStateHandle> broadcastStateManagedFuture;
> {code}
> Also the {{CheckpointCoordinator}} and savepoint/checkpoint serialisation logic will have to be adapted to support this new kind of state. With the ongoing changes in supporting incremental snapshotting and other new features for `KeyedStateBackend` this should be coordinated with [~StephanEwen] and/or [~stefanrichter83@gmail.com] and/or [~xiaogang.shi]. We also have to be very careful about maintaining compatibility with savepoints from older versions.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)