You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2016/09/19 13:58:20 UTC
[jira] [Commented] (FLINK-4379) Add Rescalable Non-Partitioned
State
[ https://issues.apache.org/jira/browse/FLINK-4379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15503567#comment-15503567 ]
ASF GitHub Bot commented on FLINK-4379:
---------------------------------------
GitHub user StefanRRichter opened a pull request:
https://github.com/apache/flink/pull/2512
Partitionable op state
This pull request introduces rescalable non-partitioned operator state as described in issue [FLINK-4379] and makes following major changes:
# 1) Introducing interfaces `PartitionableStateBackend` and `SnapshotProvider`
For rescaling purposes, non-partitioned state is now stored in a `PartitionableStateBackend`, which provides a method to register operator states under unique names and store state partitions in a list state:
```
<S> ListState<S> getPartitionableState(
String name,
Collection<PartitionableOperatorStateHandle> restoreSnapshots,
TypeSerializer<S> partitionStateSerializer) throws Exception;
```
Notice that a `TypeSerializer` is provided on a per-state basis so that we can realize backward compatibility if the serialization format changes.
Furthermore, we introduce `SnapshotProvider` which offers a method to create snapshots:
```
RunnableFuture<S> snapshot(
long checkpointId,
long timestamp,
CheckpointStreamFactory streamFactory) throws Exception;
```
`DefaultPartitionableStateBackend` is an implementation of both, `PartitionableStateBackend` and `SnapshotProvider`, where the first interface gives a view that is exposed to user code and the later interface is only used by the system to trigger snapshots.
Similar to other state backends (e.g. `KeyedStateBackend`), a `DefaultPartitionableStateBackend` can be created and restored through the class `AbstractStateBackend`:
```
public DefaultPartitionableStateBackend createPartitionableStateBackend(
Environment env,
String operatorIdentifier)
public DefaultPartitionableStateBackend restorePartitionableStateBackend(
Environment env,
String operatorIdentifier,
Collection<PartitionableOperatorStateHandle> restoreSnapshots)
```
# 2) Interface `PartitionableCheckpointed` as replacement for `Checkpointed`
User code can use rescalable non-partitioned state by implementing the `PartitionableCheckpointed` interface. This interface is meant to replace the `Checkpointed` interface, which would become deprecated.
```
void storeOperatorState(long checkpointId, PartitionableStateBackend backend) throws Exception;
void restoreOperatorState(PartitionableStateBackend backend) throws Exception;
```
Through the store/restore methods, user code can interact with `PartitionableStateBackend` and register/restore states as previously explained.
Methods from this interface are invoked in `StreamTask::performCheckpoint(...)` and `StreamTask::restoreState()` respectively.
# 3) Interface `StateRepartitioner`
This interface allows implementations of repartitioning strategies the redistribute non-partitioned state when for changing parallelism. Currently, there is only one default strategy implemented in `RoundRobinStatePartitioner`, but in general it is possible to implement different strategies, e.g. a `StatePartitioner` that distributes the union of all previous states parallel subtasks to each parallel subtask. This interface is used in `CheckpointCoordinator::restoreLatestCheckpointedState()`.
TODO: After review, a description of the new features must still be added to the Flink documentation.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/StefanRRichter/flink partitionable-op-state
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/2512.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2512
----
commit d133b948b8a201178e4ad3afa49e715bebf8eb5f
Author: Stefan Richter <s....@data-artisans.com>
Date: 2016-08-31T21:59:27Z
State backend infrastructure to support partitionable op state.
Introducing CheckpointStateHandles as container for all checkpoint related state handles
tmp repartitioning function wip
commit 7213938be4bd706b57128c49cb24b24450ea1c87
Author: Till Rohrmann <tr...@apache.org>
Date: 2016-09-07T16:52:48Z
Add task chain length to TaskState
commit b639c8c62f64f840b95cd6191d7adf514eecddc6
Author: Till Rohrmann <tr...@apache.org>
Date: 2016-09-08T21:11:11Z
Forward partitionable operator state to PartitionableCheckpointed operators
commit 090d27f6daa173b43f1c5d2ee19cc661359b925a
Author: Till Rohrmann <tr...@apache.org>
Date: 2016-09-08T22:05:13Z
Make Kafka sources re-partitionable by using the partitionable non-keyed state
Allow Kafka sources to use repartitionable state
Remove CheckpointedAsynchronously from FlinkKafkaConsumerBase
commit bc964d7ad6c1608d8bae0db67efdb70c4c278086
Author: Till Rohrmann <tr...@apache.org>
Date: 2016-09-09T13:34:38Z
Make Kafka producer re-partitionable by removing the Checkpointed interface
commit 2366feeff842c3db39625da5a5912a98e83692db
Author: Till Rohrmann <tr...@apache.org>
Date: 2016-09-09T17:02:32Z
Fix HeapKeyedStateBackend.restorePartitionedState
commit e71cd8629ac1f195a53749d3303a7acf3cc4e4dd
Author: Stefan Richter <s....@data-artisans.com>
Date: 2016-09-15T16:42:55Z
Partitionable State temporary WIP
commit e84998d2702b9461f59df14c5aa1bb8d7438457a
Author: Stefan Richter <s....@data-artisans.com>
Date: 2016-09-16T12:22:47Z
Introduce partitionable state in savepoint serializer
commit a50629cb8eb876dfcd1af2199e4f4c08bc8562ad
Author: Stefan Richter <s....@data-artisans.com>
Date: 2016-09-16T12:37:34Z
Using known chain length to replace maps with list of known size
commit 2921b478cbc9cadc2c205d8ee66c9f66c11245f1
Author: Stefan Richter <s....@data-artisans.com>
Date: 2016-09-16T17:56:02Z
Added test case for rescaling partitionable state
commit 26b48fedc648715e0ed3e5a4642bede91c86ff02
Author: Stefan Richter <s....@data-artisans.com>
Date: 2016-09-19T10:16:26Z
Code cleanup, minor refinements and documentation
----
> Add Rescalable Non-Partitioned State
> ------------------------------------
>
> Key: FLINK-4379
> URL: https://issues.apache.org/jira/browse/FLINK-4379
> Project: Flink
> Issue Type: New Feature
> Components: State Backends, Checkpointing
> Reporter: Ufuk Celebi
> Assignee: Stefan Richter
>
> This issue is associated with [FLIP-8| https://cwiki.apache.org/confluence/display/FLINK/FLIP-8%3A+Rescalable+Non-Partitioned+State].
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)