You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2016/05/13 14:23:11 UTC

[GitHub] flink pull request: [FLINK-3761] Introduction of key groups

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/1988

    [FLINK-3761] Introduction of key groups

    This pull request introduces the concept of key groups to Flink. A key group is the smallest assignable unit of key-value state to a stream operator. Differently said, it is a sub set of the key space which is assigned to a stream operator. With the introduction of key groups, it will be possible to dynamically scale Flink operators which use partitioned (=key-value) state.
    
    In order to support key groups, the following components were added/changed:
    
    ## Introduction of KeyGroupStateBackend
    
    In order to make the state backends aware of the key groups, we had to introduce a new type of state backend `KeyGroupStateBackend`. A key group state backend behaves similarly to the old `AbstractStateBackend` just with the difference that it is aware of key groups. This means that it generates a snapshot for each key group upon snapshotting.
    
    In order to leverage the existing implementation of state backends, the `AbstractStateBackend` was split up into the new `AbstractStateBackend` which is responsible for managing non-partitioned state and the `PartitionedStateBackend` which is responsible for managing the partitioned state.
    
    Furthermore, the PR comes with a `GenericKeyGroupStateBackend` implementation which is the standard for the `KeyGroupStateBackend`. The `GenericKeyGroupStateBackend` simply creates for every key group a `PartitionedStateBackend` which manages this key group. Upon snapshotting, each `PartitionedStateBackend` is snapshot.
    
    ## Introduction of KeyGroupAssigner
    
    In order to assign keys to a key group, the `KeyGroupAssigner` interface is introduced. The `KeyGroupAssigner` implementation is used by the `GenericKeyGroupStateBackend` to select the proper `PartitionedStateBackend` for the current key. Furthermore, the former `HashPartitioner`, which is renamed now into `KeyGroupStreamPartitioner` uses the `KeyGroupAssigner` to distribute the streaming records in a consistent way wrt to the key group mappings. The key groups itself are mapped in a round robin fashion (key group index modulo #channels = out-going channel) to the downstream tasks.  
    
    ## Introduction of MaxParallelism to user API
    
    In order to scale programs up or down, it is necessary to define the maximum number of key groups. The maximum number of key groups denotes the maximum parallelism of an operator, because every operator needs at least one key group to get elements assigned. Thus, in order to specify this upper limit, the `ExecutionConfig` allows to set a job-wide max parallelism value via `ExecutionConfig.setMaxParallelism`. In addition to that the `SingleOutputStreamOperator` allows to set similarly to the parallelism a max parallelism value on an operator basis. If the max parallelism has not been set and there is no job-wide max parallelism set, the parallelism of the operator will be taken as the max parallelism. Thus, every operator would then receive a single key group.
    
    However, in order to scale jobs up and down, one has to set a max parallelism for the operators which shall be scaled up/down. The upper limit for the scaling is the max parallelism value. The max parallelism mustn't change when scaling the job, because it would destroy the mapping of keys to key groups. 
    
    - [X] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [X] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [X] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink keyGroups2

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1988.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1988
    
----
commit ea0ca6ac53bd8f34fb8d6b1684f6db14a35ca1f0
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-03-14T08:41:08Z

    Introduce KeyGroupAssigner, KeyGroupStateBackend and KeyGroupKVState

commit 4bfdece4c9f823da1ad6fe8a894b09fbd0e8101f
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-04-07T12:05:34Z

    Add key group state handles to AcknowledgeCheckpoint message

commit 19aa0d62e60a9a148b5b8a145e5e95099d149799
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-04-07T12:06:41Z

    Rename StreamTaskState -> StreamOperatorState and StreamTaskStateList -> StreamTaskState

commit af2dfb01fc4951cf3ef548a210de657702f2f506
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-04-07T14:48:49Z

    Introduce KeyGroupState to StreamTask, StreamOperator and StateBackend

commit 32e705555f737a6b9971946bf2564309e42d4ea3
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-04-15T10:05:44Z

    Introduce KeyGroupStateBackend

commit 16d8111260dba5edfb507dfb06b71c5ea500c7ba
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-04-28T16:50:12Z

    Adapt test cases to work with refactored state backends

commit 088b5f9b37a7950125640b05e85231c70e501423
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-04-28T16:50:40Z

    Refactor RocksDbStateBackend into PartitionedRocksDbStateBackend and RocksDbStateBackend

commit f29d9a64181d2d2af59a4172d9a492d7d50e1c69
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-05-02T08:51:10Z

    Introduce partitioned and non-partitioned stream operator state

commit 2dd69e32b2ee0523bd18383f4c10072598158ba7
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-05-03T16:57:21Z

    Forward key group state to TDD

commit 45501c55fe76d5536668c1c4a1ca0bfd62ffa606
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-05-03T17:30:12Z

    Remove state descriptor generic parameter from KvState and KvStateSnapshot

commit f52ae87f831b62a7c596359fa779485e0231fee7
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-05-03T17:43:20Z

    Remove state generic parameter from KvState and KvStateSnapshot

commit 76e3db5a358857e43f8e03abdae28c476a68fc50
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-05-03T17:44:49Z

    Remove unused state descriptor from RocksDBReducingState

commit 6b8ad068b5d53e841e94debd7376313ce47c0a08
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-05-04T14:42:01Z

    Send key group state size to checkpoint coordinator

commit 62cf90df1da9a8724eb24713227318f4557c4915
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-05-04T14:52:11Z

    Add sanity check for number of key groups

commit 1a45cbcdbecd90d35457269fff6ddaa7e3889902
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-05-04T18:41:27Z

    Add max parallelism to job vertex and execution job vertex

commit acfee09eb66a35f61dd7e2d6912259de6f70e8e4
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-05-06T12:11:52Z

    Add setMaxParallelism to SingleOutputStreamOperator
    
    If max parallelism is not set, then the parallelism is chosen as the max
    parallelism.

commit 567ccb5fbd3cb9cb12acbc812ebda08b20cf71cf
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-05-06T13:29:47Z

    Add documentation

commit 2af688f7e2e5f99260755e90c1424e910cbd19d6
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-05-06T15:35:40Z

    Add setMaxParallelism to Scala API
    
    Set parallelism in StreamGraphGenerator if no default parallelism has been set

commit b7b302382718531f7395f0c06e426f616a8e5146
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-05-09T12:37:09Z

    Remove disposeAllStateForCurrentJob from AbstractStateBackend

commit c8dacbab2022ec216174ddf9dd5e2bddfcf7be11
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-05-10T19:57:01Z

    Add RescalingITCase

commit 7218af747db9f098b07f8375539340348a4b2146
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-05-11T07:53:52Z

    Rework CheckpointCoordinator to support rescaling

commit c81d88e2908a1f47cfa15b472e5efc871606ccc5
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-05-11T08:21:10Z

    Test rescaling job with non-partitioned state

commit 7f05674406175c9ba6b98378aa21efee35a2c746
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-05-11T08:33:53Z

    Introduce SavepointCoordinatorDeActivator/CheckpointCoordinatorDeActivatorCreator to resolve warning

commit c0eca81a8f1ac4167b0b3c6a325256456d45652c
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-05-11T08:57:22Z

    Add tests for HashKeyGroupAssigner

commit e6a4122342f11894e99f93c964919853fa5b04f2
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-05-11T11:48:16Z

    Add GenericKeyGroupStateBackendTest

commit 4455f9861a90d4d99e1a2fe963e89cbe47b264d5
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-05-11T15:52:17Z

    Add tests for configuring the KeyGroupStreamPartitioner

commit c83297936d4795dd8d6bc84654122b4f0cdc64ae
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-05-11T16:13:59Z

    Add StreamingJobGraphGeneratorTest
    
    Add test cases for connected streams

commit 5519ff559b9846196389b04bafeeb886266c0881
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-05-12T09:14:13Z

    Set proper access modifiers for CheckpointCoordinator

commit 6a36ee70f159ffdd69e81fcd202ba2fdbb743b3d
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-05-12T09:59:54Z

    Replace polling from StreamTaskTest

commit 9f2d01aaa03ebdbad3d1e62d164dd406ca364517
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-05-12T17:12:24Z

    Test checkpointing and recovery of StreamTask

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1988: [FLINK-3761] Introduction of key groups

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/1988
  
    Yes will close the PR because it has been subsumed by #2440.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3761] Introduction of key groups

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1988#issuecomment-219083052
  
    @aljoscha would be great to get some feedback from you :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3761] Introduction of key groups

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/flink/pull/1988#issuecomment-219999205
  
    So if I understand currently there is no way to scale jobs with non-partitioned states. This also means that window operations (that register timers) will not be scalable right? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3761] Introduction of key groups

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1988#issuecomment-221309557
  
    Thanks for the initial feedback @aljoscha :-)
    
    The introduction of `PartitionedState` is indeed not strictly necessary for this PR. The idea was that we will have partitioned and non-partitioned state in the future. `PartitionedState` is the key-value state backed by the `PartitionedStateBackend` whereas non-partitioned state is backed by the `AbstractStateBackend`. The first non-partitioned state (apart from the state serialized via `CheckpointStateOutputStream`) could be the redistributable non-partitioned state necessary for the `KafkaSources`, for example. Thus, the `PartitionedState` is more of a logical separation and it lays the foundation so that also non-keyed stream operators can use a proper state abstraction. But I can revert it, if you deem it redundant or pre-mature.
    
    It is true that the `PartitionedStateBackend` and the `KeyGroupStateBackend` have **almost** the same signature. However, the changes you've mentioned are imho crucial and made the whole refactoring of the state backends necessary in the first place. The difference is that the `KeyGroupStateBackend` is aware of the key groups and, consequently, is able to snapshot and restore each key group individually. Trying to work around this would mean that the `PartitionedStateBackend` always has a single key group associated. But for that, it would have to know the sub task index of the enclosing `StreamOperator` to assign a sensible key group index. Furthermore, it wouldn't make sense to use any other `PartitionedStateBackend` than the `KeyGroupStateBackend` (given that it respects the `KeyGroupAssigner`) for the `AbstractStreamOperator`, because the data is shuffled according to the key group assignments. In general, I think the notion of key groups are touching too many parts of the Fl
 ink runtime so that it makes no longer sense to try to unify the `KeyGroupStateBackends` and `PartitionedStateBackends`. The state backends used by the `AbstractStreamOperator` have to be aware of that notion.
    
    You can regard the `PartitionedStateBackend` as an internal class which was introduced to reuse the existing state backend implementations via the `GenericKeyGroupStateBackend`. In the future it might make sense to directly implement the `KeyGroupStateBackend` interface to decrease the key group overhead. It's just unfortunate that Java does not allow to specify package private methods. Otherwise, I would have declared the `createPartitionedStateBackend` as package private. But since the `GenericKeyGroupStateBackend` resides in a sub-package of `o.a.f.runtime.state`, it cannot access this method. But I think we could refactor it the following way: Remove `createPartitionedStateBackend`, make `createKeyGroupStateBackend` abstract, let the implementations of `AbstractStateBackend` implement a `PartitionedStateBackendFactory` interface and define the `createKeyGroupStateBackend` method for all `AbstractStateBackend` implementations with creating a `GenericKeyGroupStateBackend` which
  requires a `PartitionedStateBackendFactory`. That would be probably a better design.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3761] Introduction of key groups

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1988#issuecomment-219082963
  
    No I haven't run performance benchmarks, yet. Should definitely do that as a follow-up. 
    
    You're right that the behaviour of RocksDB is interesting in particular. It might make sense for the future to implement a RocksDB specialised `KeyGroupStateBackend` implementation, which uses only a single RocksDB instance for all key groups.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3761] Introduction of key groups

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1988#issuecomment-221311349
  
    Jip, this last paragraph with the Factory is what I hinted at with my comment. \U0001f603 This may be somewhat academic but if there is a method `getPartitionedStateBackend` the likelihood of it being wrongly used is somewhat high.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3761] Introduction of key groups

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1988#issuecomment-221312885
  
    Yes definitely: better be safe than sorry. Will remove the `createPartitionedStateBackend` method from `AbstractStateBackend`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3761] Introduction of key groups

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1988#issuecomment-221289448
  
    I started looking into it, but man this is one big change... \U0001f603 
    
    I have some first remarks about API and internals:
    
    Whats the reason for the introduction of `PartitionedState`? The Javadoc for `State` already says that it is the base class for partitioned state and that it is only usable on a `KeyedStream`.
    
    The signature of `KeyGroupedStateBackend` and `PartitionedStateBackend` is exactly the same. `AbstractStateBackend` has both, method `createPartitionedStateBackend` and `createKeyGroupStateBackend`. Users of an `AbstractStateBackend` should only ever call the latter while the former is reserved for internal use by the default implementation for `KeyGroupedStateBackend` which is `GenericKeyGroupStateBackend`. Also, `AbstractStreamOperator` has the new method `getKeyGroupStateBackend` that should be used by operators such as the `WindowOperator` to deal with partitioned state. Now, where am I going with this? What I think is that the `AbstractStateBackend` should only have a method `createPartitionedStateBackend` that is externally visible. This would be used by the `AbstractStreamOperator` to create a state backend and users of the interface, i.e. `WindowOperator` would also deal just with `PartitionedStateBackend`, which they get from `AbstractStreamOperator.getPartitionedStateBa
 ckend`. The fact that there are these key groups should not be visible to users of a state backend. Internally, state backends would use the `GenericKeyGroupStateBackend`, they could provide an interface to it for creating non-key-grouped backends.
    
    Above, "exactly the same" is not 100 % correct, since the snapshot/restore methods differ slightly but I think this could be worked around. Also, I found it quite hard to express what I actually mean but I hope you get my point. \U0001f605 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1988: [FLINK-3761] Introduction of key groups

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann closed the pull request at:

    https://github.com/apache/flink/pull/1988


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3761] Introduction of key groups

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1988#issuecomment-220289629
  
    In order to support scaling for non-partitioned state, the next step could be the introduction of a kind of union state. The idea would be to aggregate the non-partitioned state of each subtask and then sending it to all subtasks upon recovery. Then every subtask can pick from the union state what it needs for its execution


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1988: [FLINK-3761] Introduction of key groups

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/1988
  
    I think this is subsumed by #2376 
    
    Should we close this Pull Request?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3761] Introduction of key groups

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/flink/pull/1988#issuecomment-219064791
  
    Very cool stuff! I was wondering did you do any benchmarks for the performance impact of this change? For instance it would be good to know how well RocksDB behaves with a large number of instances etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3761] Introduction of key groups

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1988#issuecomment-220293946
  
    Yeah, we were also wondering wether it would make sense to allow the state itself to be repartitioned, i.e. union and then split into the new parallelism. In this way we wouldn't read all state in every operator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3761] Introduction of key groups

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1988#issuecomment-220002453
  
    @gyfora Yes, this is correct. We'll have to put the timers into partitioned state and upon restarting iterate over the saved timers to re-set any callbacks at the `StreamTask`.
    
    Doing this naively would slow things down. What I had in mind is to still keep the timers in an in-memory `Map` but before snapshotting put them into a partitioned state. This way, we don't have the overhead every time that we deal with timers but just when checkpointing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---