You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2016/08/16 14:59:20 UTC

[jira] [Commented] (FLINK-3755) Introduce key groups for key-value state to support dynamic scaling

    [ https://issues.apache.org/jira/browse/FLINK-3755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422848#comment-15422848 ] 

ASF GitHub Bot commented on FLINK-3755:
---------------------------------------

GitHub user StefanRRichter opened a pull request:

    https://github.com/apache/flink/pull/2376

    [FLINK-3755] Introduce key groups for key-value state to support dynamic scaling

    This pull request introduces the concept of key groups to Flink. A key group is the smallest assignable unit of key-value state to a stream operator. Differently said, it is a sub set of the key space which can be assigned to a stream operator. With the introduction of key groups, it will be possible to dynamically scale Flink operators that use partitioned (=key-value) state.
    
    In particular, this pull request addresses the following sub-issues:
    - fully: [FLINK-4380] Introduce KeyGroupAssigner and Max-Parallelism Parameter
    - partially: [FLINK-4381] Refactor State to Prepare For Key-Group State Backends
    
    Furthermore, this pull request is partially based on pull request: #1988
    
    Overall, this pull request introduces the following changes:
    
    # 1) Adopted from #1988 (plus introduction of distributing keys as ranges (`KeyGroupRange`) 
    
    ## a) Introduction of KeyGroupAssigner
    
    In order to partition keys into key groups, the`KeyGroupAssigner` interface is introduced. This allows for partitioning the key space into smaller units which can be assigned to operators. A scale up/down of parallelism is then performed by simply reassigning the key groups to more/less operators.
    
    For this pull request, the former `HashPartitioner` is now renamed to `KeyGroupStreamPartitioner` and uses the `KeyGroupAssigner` to distribute the streaming records in a consistent way w.r.t. the key group mappings. The key groups, in turn, are mapped as ranges of key groups (`key group index * parallelism / number of key groups` = out-going channel) to the downstream tasks. 
    
    When restoring from a checkpoint or savepoint, scale up/down of parallelism basically boils down to splitting/merging the key group ranges in alignment with the adjusting assignment to operators that happens automatically through the `KeyGroupStreamPartitioner`.
    
    ## b) Introduction of MaxParallelism to user API
    
    In order to scale programs up or down, it is necessary to define the maximum number of key groups. The maximum number of key groups denotes the maximum parallelism of an operator, because every operator needs at least one key group to get elements assigned. Thus, in order to specify this upper limit, the ExecutionConfig allows to set a job-wide max parallelism value via ExecutionConfig.setMaxParallelism. In addition to that the SingleOutputStreamOperator allows to set similarly to the parallelism a max parallelism value on an operator basis. If the max parallelism has not been set and there is no job-wide max parallelism set, the parallelism of the operator will be taken as the max parallelism. Thus, every operator would then receive a single key group. Currently, we restrict the maximum number of key groups to 2^15 (Short.MAX_VALUE).
    
    # 2)  State and StateHandle refactoring
    
    ## a) StateHandle refactoring
    
    We have simplified and cleaned up the hierarchy and use cases of state handles. `StreamStateHandle` and `RetrievableStateHandle` are now at the heart of the state handles system.
    Their conceptual main difference is that `StreamStateHandle` provides a seekable input stream to the actual state data and leaves state reconstruction to client code, whereas `RetrievableStateHandle` represents a simple way for client code to retrieve state as readily constructed object and the state handle implementation taking care of state reconstruction.
    
    ## b) Operator serialization
    
    The unified abstraction for operators to persist their state is `CheckpointStateOutputStream`. All operators can simply directly write their serialized state into this stream, which returns a `StreamStateHandle` on close. `StreamTaskState` and `StreamTaskStateList` become obsolete. This change makes versioning of operator state serialization formats easier and we should ensure and test that our operators are aware of serialization versions.
    
    This change leaves the following methods for snapshot/restore in `StreamOperator`:
    ```
    void snapshotState(
        FSDataOutputStream out, 
        long checkpointId, 
        long timestamp) throws Exception;
    
    void restoreState(FSDataInputStream in) throws Exception;
    ```
    ## c) Split task state into operator state (= non-partitioned state) and keyed-state (= partitioned state)
    
    We have split the operator state into operator state and keyed state as follows. 
    
    Operator state is organized as a `ChainedStateHandle<StreamStateHandle>`. The chained state handle encapsulates the individual `StreamStateHandle` for all operators in an operator chain.
     
    Keyed state is organized as a `List<KeyGroupsStateHandle>`. Each `KeyGroupsStateHandle` consists of one `StreamStateHandle` and one `KeyGroupRangeOffsets` object. `KeyGroupRangeOffsets` denotes the range of all key groups that can are referenced by the handle, together with their respective stream offsets. The `StreamStateHandle` gives access to a seekable stream that contains the actual state data for all key groups from the key group range; individual key group states are located in the stream at their previously mentioned stream offsets.
    Notice that we have to provide a list of `KeyGroupsStateHandle` to support the case of scaling down parallelism. In this case, it can happen that key group states from several `KeyGroupsStateHandle` (each representing the state of one previously existing operator) have to be combined to form the keyed state of reduced amount of current operators. 


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StefanRRichter/flink keyed-state

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2376.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2376
    
----
commit 8a57da24b499e059fb73bd7050a96d32b57fcec4
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-07-28T13:08:24Z

    [FLINK-4380] Introduce KeyGroupAssigner and Max-Parallelism Parameter
    
    This introduces a new KeySelector that assigns keys to key groups and
    also adds the max parallelism parameter throughout all API levels.
    
    This also adds tests for the newly introduced features.

commit 62fb798b762d8a69d30479561ed43b580facc600
Author: Stefan Richter <s....@data-artisans.com>
Date:   2016-08-11T09:59:07Z

    [FLINK-4381] Refactor State to Prepare For Key-Group State Backends

commit c038d6d9435c329ab4ca06c119cff5456924b5ab
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-08-11T10:14:18Z

    [FLINK-4380] Add tests for new Key-Group/Max-Parallelism
    
    This tests the rescaling features in CheckpointCoordinator and
    SavepointCoordinator.

commit 751effb855a81e6322a7e897c98dc59ea065d072
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2016-08-12T09:07:09Z

    Ignore RescalingITCase

----


> Introduce key groups for key-value state to support dynamic scaling
> -------------------------------------------------------------------
>
>                 Key: FLINK-3755
>                 URL: https://issues.apache.org/jira/browse/FLINK-3755
>             Project: Flink
>          Issue Type: New Feature
>    Affects Versions: 1.1.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>
> In order to support dynamic scaling, it is necessary to sub-partition the key-value states of each operator. This sub-partitioning, which produces a set of key groups, allows to easily scale in and out Flink jobs by simply reassigning the different key groups to the new set of sub tasks. The idea of key groups is described in this design document [1]. 
> [1] https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)