You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2016/01/29 16:07:40 UTC

[jira] [Commented] (FLINK-3201) Enhance Partitioned State Interface with State Types

    [ https://issues.apache.org/jira/browse/FLINK-3201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15123556#comment-15123556 ] 

ASF GitHub Bot commented on FLINK-3201:
---------------------------------------

GitHub user aljoscha opened a pull request:

    https://github.com/apache/flink/pull/1562

    Enhance Partitioned State and use it in WindowOperator

    The commits in this are self-contained. The first one enhances the partitioned state such that it can be used by the WindowOperator. The second commit adds the required changes in WindowOperator. The third commit adds a State Backend based on RocksDB. This means that window operator can not run on different types of state backends transparently.
    
    @StephanEwen has some things that he wants to change, those will be added here (I think). I'm opening this now so that people can have a look at it.
    
    ## [FLINK-3201] Enhance Partitioned State Interface with State Types
    Add new state types ValueState, ListState and ReducingState, where
    ListState and ReducingState derive from interface MergingState.
    
    ValueState behaves exactly the same as OperatorState. MergingState is a
    stateful list to which elements can be added and for which the elements
    that it contains can be obtained. If using a ListState the list of
    elements is actually kept, for a ReducingState a reduce function is used
    to combine all added elements into one. To create a ValueState the user
    passes a ValueStateIdentifier to
    StreamingRuntimeContext.getPartitionedState() while they would pass a
    ListStateIdentifier or ReducingStateIdentifier for the other state
    types.
    
    This change is necessary to give the system more information about the
    nature of the operator state. We want this to be able to do incremental
    snapshots. This would not be possible, for example, if the user had a
    List as a state. Inside OperatorState this list would be opaque and
    Flink could not create good incremental snapshots.
    
    This also refactors the StateBackend. Before, the logic for partitioned
    state was spread out over StreamingRuntimeContext,
    AbstractStreamOperator and StateBackend. Now it is consolidated in
    StateBackend.
    
    This also adds support for partitioned state in two-input operators.
    
    ## [FLINK-3200] Use Partitioned State in WindowOperator
    
    This changes window operator to use the new partitioned state
    abstraction for keeping window contents instead of custom internal
    state and the checkpointed interface.
    
    For now, timers are still kept as custom checkpointed state, however.
    
    WindowOperator now expects a StateIdentifier for MergingState, this can
    either be for ReducingState or ListState but WindowOperator is agnostic
    to the type of State. Also the signature of WindowFunction is changed to
    include the type of intermediate input. For example, if a ReducingState
    is used the input of the WindowFunction is T (where T is the input
    type). If using a ListState the input of the WindowFunction would be of
    type Iterable[T].

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/aljoscha/flink window-on-state

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1562.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1562
    
----
commit dc440ba4d97cecd297e432f14342dba5382cab50
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2016-01-25T11:33:51Z

    [FLINK-3201] Enhance Partitioned State Interface with State Types
    
    Add new state types ValueState, ListState and ReducingState, where
    ListState and ReducingState derive from interface MergingState.
    
    ValueState behaves exactly the same as OperatorState. MergingState is a
    stateful list to which elements can be added and for which the elements
    that it contains can be obtained. If using a ListState the list of
    elements is actually kept, for a ReducingState a reduce function is used
    to combine all added elements into one. To create a ValueState the user
    passes a ValueStateIdentifier to
    StreamingRuntimeContext.getPartitionedState() while they would pass a
    ListStateIdentifier or ReducingStateIdentifier for the other state
    types.
    
    This change is necessary to give the system more information about the
    nature of the operator state. We want this to be able to do incremental
    snapshots. This would not be possible, for example, if the user had a
    List as a state. Inside OperatorState this list would be opaque and
    Flink could not create good incremental snapshots.
    
    This also refactors the StateBackend. Before, the logic for partitioned
    state was spread out over StreamingRuntimeContext,
    AbstractStreamOperator and StateBackend. Now it is consolidated in
    StateBackend.
    
    This also adds support for partitioned state in two-input operators.

commit 865723e3ff0133ba9d921907c298c3545fdfe32c
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2016-01-25T11:34:05Z

    [FLINK-3200] Use Partitioned State in WindowOperator
    
    This changes window operator to use the new partitioned state
    abstraction for keeping window contents instead of custom internal
    state and the checkpointed interface.
    
    For now, timers are still kept as custom checkpointed state, however.
    
    WindowOperator now expects a StateIdentifier for MergingState, this can
    either be for ReducingState or ListState but WindowOperator is agnostic
    to the type of State. Also the signature of WindowFunction is changed to
    include the type of intermediate input. For example, if a ReducingState
    is used the input of the WindowFunction is T (where T is the input
    type). If using a ListState the input of the WindowFunction would be of
    type Iterable[T].

commit 1034a02380652ef5184e0094a41ef073c7c9b4fd
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2016-01-21T09:56:47Z

    [FLINK-3278] Add Partitioned State Backend Based on RocksDB

commit 13d1f541a48f82b1a10854addbdd5ea9bde7b079
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2016-01-28T17:18:26Z

    Move RocksDB backup to external processes

----


> Enhance Partitioned State Interface with State Types
> ----------------------------------------------------
>
>                 Key: FLINK-3201
>                 URL: https://issues.apache.org/jira/browse/FLINK-3201
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Streaming
>    Affects Versions: 1.0.0
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>
> We should enhance the partitioned state with different state types, so that the system knows about the semantics of the State. I propose for now:
> - ValueState, this behaves like the current OperatorState: state is one value that can be set and retrieved
> - ListState, state is a list that can be appended to and iterated over
> - ReducingState, state is one value that other values can be added to
> ListState and ReducingState would share a common superclass to allow them to be used in the same places. For example, the WindowOperator would use ReducingState and ListState interchangeably, depending on whether we have a ReduceFunction or not.
> These additions allow the system to be clever about how state is checkpointed in the future. Think ManageMemory/Out-of-core state and incremental checkpoints.
> Also, state should be scoped to both a key and a namespace. This will allow the WindowOperator to use the interface. Right now, WindowOperator has a custom state implementation that uses a two-level Map (by key and by window). In this case the window would be the namespace.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)