You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2016/09/19 13:58:20 UTC

[jira] [Commented] (FLINK-4379) Add Rescalable Non-Partitioned State

    [ https://issues.apache.org/jira/browse/FLINK-4379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15503567#comment-15503567 ] 

ASF GitHub Bot commented on FLINK-4379:
---------------------------------------

GitHub user StefanRRichter opened a pull request:

    https://github.com/apache/flink/pull/2512

    Partitionable op state

    This pull request introduces rescalable non-partitioned operator state as described in issue [FLINK-4379] and makes following major changes:
    
    # 1) Introducing interfaces `PartitionableStateBackend` and `SnapshotProvider`
    
    For rescaling purposes, non-partitioned state is now stored in a `PartitionableStateBackend`, which provides a method to register operator states under unique names and store state partitions in a list state:
    
    ```
    <S> ListState<S> getPartitionableState(
    	String name,
    	Collection<PartitionableOperatorStateHandle> restoreSnapshots,
    	TypeSerializer<S> partitionStateSerializer) throws Exception;
    ```
    
    Notice that a `TypeSerializer` is provided on a per-state basis so that we can realize backward compatibility if the serialization format changes.
    
    Furthermore, we introduce `SnapshotProvider` which offers a method to create snapshots:
    
    ```
    RunnableFuture<S> snapshot(
    	long checkpointId,
    	long timestamp,
    	CheckpointStreamFactory streamFactory) throws Exception;
    ```
    
    `DefaultPartitionableStateBackend` is an implementation of both, `PartitionableStateBackend` and `SnapshotProvider`, where the first interface gives a view that is exposed to user code and the later interface is only used by the system to trigger snapshots.
    
    Similar to other state backends (e.g. `KeyedStateBackend`), a `DefaultPartitionableStateBackend` can be created and restored through the class `AbstractStateBackend`:
    
    ```
    public DefaultPartitionableStateBackend createPartitionableStateBackend(
    	Environment env,
    	String operatorIdentifier)
    
    public DefaultPartitionableStateBackend restorePartitionableStateBackend(
    	Environment env,
    	String operatorIdentifier,
    	Collection<PartitionableOperatorStateHandle> restoreSnapshots)
    ```
    
    # 2) Interface `PartitionableCheckpointed` as replacement for `Checkpointed`
    
    User code can use rescalable non-partitioned state by implementing the `PartitionableCheckpointed` interface. This interface is meant to replace the `Checkpointed` interface, which would become deprecated.
    
    ```
    void storeOperatorState(long checkpointId, PartitionableStateBackend backend) throws Exception;
    
    void restoreOperatorState(PartitionableStateBackend backend) throws Exception;
    ```
    
    Through the store/restore methods, user code can interact with `PartitionableStateBackend` and register/restore states as previously explained.
    
    Methods from this interface are invoked in `StreamTask::performCheckpoint(...)` and `StreamTask::restoreState()` respectively.
    
    # 3) Interface `StateRepartitioner`
    
    This interface allows implementations of repartitioning strategies the redistribute non-partitioned state when for changing parallelism. Currently, there is only one default strategy implemented in `RoundRobinStatePartitioner`, but in general it is possible to implement different strategies, e.g. a `StatePartitioner` that distributes the union of all previous states parallel subtasks to each parallel subtask. This interface is used in `CheckpointCoordinator::restoreLatestCheckpointedState()`.
    
    TODO: After review, a description of the new features must still be added to the Flink documentation.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StefanRRichter/flink partitionable-op-state

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2512.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2512
    
----
commit d133b948b8a201178e4ad3afa49e715bebf8eb5f
Author: Stefan Richter <s....@data-artisans.com>
Date:   2016-08-31T21:59:27Z

    State backend infrastructure to support partitionable op state.
    
    Introducing CheckpointStateHandles as container for all checkpoint related state handles
    
    tmp repartitioning function wip

commit 7213938be4bd706b57128c49cb24b24450ea1c87
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-09-07T16:52:48Z

    Add task chain length to TaskState

commit b639c8c62f64f840b95cd6191d7adf514eecddc6
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-09-08T21:11:11Z

    Forward partitionable operator state to PartitionableCheckpointed operators

commit 090d27f6daa173b43f1c5d2ee19cc661359b925a
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-09-08T22:05:13Z

    Make Kafka sources re-partitionable by using the partitionable non-keyed state
    
    Allow Kafka sources to use repartitionable state
    
    Remove CheckpointedAsynchronously from FlinkKafkaConsumerBase

commit bc964d7ad6c1608d8bae0db67efdb70c4c278086
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-09-09T13:34:38Z

    Make Kafka producer re-partitionable by removing the Checkpointed interface

commit 2366feeff842c3db39625da5a5912a98e83692db
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-09-09T17:02:32Z

    Fix HeapKeyedStateBackend.restorePartitionedState

commit e71cd8629ac1f195a53749d3303a7acf3cc4e4dd
Author: Stefan Richter <s....@data-artisans.com>
Date:   2016-09-15T16:42:55Z

    Partitionable State temporary WIP

commit e84998d2702b9461f59df14c5aa1bb8d7438457a
Author: Stefan Richter <s....@data-artisans.com>
Date:   2016-09-16T12:22:47Z

    Introduce partitionable state in savepoint serializer

commit a50629cb8eb876dfcd1af2199e4f4c08bc8562ad
Author: Stefan Richter <s....@data-artisans.com>
Date:   2016-09-16T12:37:34Z

    Using known chain length to replace maps with list of known size

commit 2921b478cbc9cadc2c205d8ee66c9f66c11245f1
Author: Stefan Richter <s....@data-artisans.com>
Date:   2016-09-16T17:56:02Z

    Added test case for rescaling partitionable state

commit 26b48fedc648715e0ed3e5a4642bede91c86ff02
Author: Stefan Richter <s....@data-artisans.com>
Date:   2016-09-19T10:16:26Z

    Code cleanup, minor refinements and documentation

----


> Add Rescalable Non-Partitioned State
> ------------------------------------
>
>                 Key: FLINK-4379
>                 URL: https://issues.apache.org/jira/browse/FLINK-4379
>             Project: Flink
>          Issue Type: New Feature
>          Components: State Backends, Checkpointing
>            Reporter: Ufuk Celebi
>            Assignee: Stefan Richter
>
> This issue is associated with [FLIP-8| https://cwiki.apache.org/confluence/display/FLINK/FLIP-8%3A+Rescalable+Non-Partitioned+State].



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)