You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StefanRRichter <gi...@git.apache.org> on 2016/09/19 13:58:04 UTC

[GitHub] flink pull request #2512: Partitionable op state

GitHub user StefanRRichter opened a pull request:

    https://github.com/apache/flink/pull/2512

    Partitionable op state

    This pull request introduces rescalable non-partitioned operator state as described in issue [FLINK-4379] and makes following major changes:
    
    # 1) Introducing interfaces `PartitionableStateBackend` and `SnapshotProvider`
    
    For rescaling purposes, non-partitioned state is now stored in a `PartitionableStateBackend`, which provides a method to register operator states under unique names and store state partitions in a list state:
    
    ```
    <S> ListState<S> getPartitionableState(
    	String name,
    	Collection<PartitionableOperatorStateHandle> restoreSnapshots,
    	TypeSerializer<S> partitionStateSerializer) throws Exception;
    ```
    
    Notice that a `TypeSerializer` is provided on a per-state basis so that we can realize backward compatibility if the serialization format changes.
    
    Furthermore, we introduce `SnapshotProvider` which offers a method to create snapshots:
    
    ```
    RunnableFuture<S> snapshot(
    	long checkpointId,
    	long timestamp,
    	CheckpointStreamFactory streamFactory) throws Exception;
    ```
    
    `DefaultPartitionableStateBackend` is an implementation of both, `PartitionableStateBackend` and `SnapshotProvider`, where the first interface gives a view that is exposed to user code and the later interface is only used by the system to trigger snapshots.
    
    Similar to other state backends (e.g. `KeyedStateBackend`), a `DefaultPartitionableStateBackend` can be created and restored through the class `AbstractStateBackend`:
    
    ```
    public DefaultPartitionableStateBackend createPartitionableStateBackend(
    	Environment env,
    	String operatorIdentifier)
    
    public DefaultPartitionableStateBackend restorePartitionableStateBackend(
    	Environment env,
    	String operatorIdentifier,
    	Collection<PartitionableOperatorStateHandle> restoreSnapshots)
    ```
    
    # 2) Interface `PartitionableCheckpointed` as replacement for `Checkpointed`
    
    User code can use rescalable non-partitioned state by implementing the `PartitionableCheckpointed` interface. This interface is meant to replace the `Checkpointed` interface, which would become deprecated.
    
    ```
    void storeOperatorState(long checkpointId, PartitionableStateBackend backend) throws Exception;
    
    void restoreOperatorState(PartitionableStateBackend backend) throws Exception;
    ```
    
    Through the store/restore methods, user code can interact with `PartitionableStateBackend` and register/restore states as previously explained.
    
    Methods from this interface are invoked in `StreamTask::performCheckpoint(...)` and `StreamTask::restoreState()` respectively.
    
    # 3) Interface `StateRepartitioner`
    
    This interface allows implementations of repartitioning strategies the redistribute non-partitioned state when for changing parallelism. Currently, there is only one default strategy implemented in `RoundRobinStatePartitioner`, but in general it is possible to implement different strategies, e.g. a `StatePartitioner` that distributes the union of all previous states parallel subtasks to each parallel subtask. This interface is used in `CheckpointCoordinator::restoreLatestCheckpointedState()`.
    
    TODO: After review, a description of the new features must still be added to the Flink documentation.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StefanRRichter/flink partitionable-op-state

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2512.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2512
    
----
commit d133b948b8a201178e4ad3afa49e715bebf8eb5f
Author: Stefan Richter <s....@data-artisans.com>
Date:   2016-08-31T21:59:27Z

    State backend infrastructure to support partitionable op state.
    
    Introducing CheckpointStateHandles as container for all checkpoint related state handles
    
    tmp repartitioning function wip

commit 7213938be4bd706b57128c49cb24b24450ea1c87
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-09-07T16:52:48Z

    Add task chain length to TaskState

commit b639c8c62f64f840b95cd6191d7adf514eecddc6
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-09-08T21:11:11Z

    Forward partitionable operator state to PartitionableCheckpointed operators

commit 090d27f6daa173b43f1c5d2ee19cc661359b925a
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-09-08T22:05:13Z

    Make Kafka sources re-partitionable by using the partitionable non-keyed state
    
    Allow Kafka sources to use repartitionable state
    
    Remove CheckpointedAsynchronously from FlinkKafkaConsumerBase

commit bc964d7ad6c1608d8bae0db67efdb70c4c278086
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-09-09T13:34:38Z

    Make Kafka producer re-partitionable by removing the Checkpointed interface

commit 2366feeff842c3db39625da5a5912a98e83692db
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-09-09T17:02:32Z

    Fix HeapKeyedStateBackend.restorePartitionedState

commit e71cd8629ac1f195a53749d3303a7acf3cc4e4dd
Author: Stefan Richter <s....@data-artisans.com>
Date:   2016-09-15T16:42:55Z

    Partitionable State temporary WIP

commit e84998d2702b9461f59df14c5aa1bb8d7438457a
Author: Stefan Richter <s....@data-artisans.com>
Date:   2016-09-16T12:22:47Z

    Introduce partitionable state in savepoint serializer

commit a50629cb8eb876dfcd1af2199e4f4c08bc8562ad
Author: Stefan Richter <s....@data-artisans.com>
Date:   2016-09-16T12:37:34Z

    Using known chain length to replace maps with list of known size

commit 2921b478cbc9cadc2c205d8ee66c9f66c11245f1
Author: Stefan Richter <s....@data-artisans.com>
Date:   2016-09-16T17:56:02Z

    Added test case for rescaling partitionable state

commit 26b48fedc648715e0ed3e5a4642bede91c86ff02
Author: Stefan Richter <s....@data-artisans.com>
Date:   2016-09-19T10:16:26Z

    Code cleanup, minor refinements and documentation

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2512: [FLINK-4379] Rescalable non-partitioned state

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/2512
  
    I think most of the proposed renames are good. One thing that would be good to change is to remove `SnapshotProvider` and just have that method directly on `OperatorStateBackend`.
    
    I really like the separation between the user facing state store and the backend. I've been meaning to change that for keyed state as well, i.e. the user only gets a way to access state and not all the methods for snapshotting, closing, etc. from the backend.
    
    When I say I like the changes, I mean everything except that they have `Operator` in all of them. In the end, all state is at an operator, plus there is the existing interface `OperatorState` that is an alias for `ValueState`. I don't have a good alternative now, but I think the `Operator` could be confusing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2512: [FLINK-4379] Rescalable non-partitioned state

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2512
  
    I would like to go for a follow-up PR. This is currently blocking some other work, so getting it in soon would help.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2512: [FLINK-4379] Rescalable non-partitioned state

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/2512
  
    Hi,
    
    I have some suggestions for renaming some of the interfaces and their methods in this pull request to come up with some clearer, more consistent naming schemes. I suggest the following changes:
    
    ## 1) Renaming the state handle that points to operator state: PartitionableStateHandle -> OperatorStateHandle
    
    ## 2) Rename: PartitionableStateBackend -> OperatorStateStore
    ```
    	/**
    	 * User-side interface for storing (partitionable) operator state.
    	 */
    	public interface OperatorStateStore {
    
    		/**
    		 * Creates (or restores) the partitionable state in this backend. Each state is registered under a unique name.
    		 * The provided serializer is used to de/serialize the state in case of checkpointing (snapshot/restore).
    		 */
    		<S> ListState<S> getListState(String name, TypeSerializer<S> partitionStateSerializer) throws Exception;
    	}
    ```
    
    ## 3) Rename: PartitionableSnapshotStateBackend -> OperatorStateBackend. I propose that the term backend now refers to some (i) store with the ability to (ii) snapshot.
    ```
    	/**
    	 * Interface that combines both, the user facing {@link OperatorStateStore} interface and the system interface
    	 * {@link SnapshotProvider}
    	 */
    	public interface OperatorStateBackend
    			extends OperatorStateStore, SnapshotProvider<PartitionableOperatorStateHandle> {
    	}
    ```
    
    ## 4) Rename: PartitionableCheckpointed -> CheckpointedOperator
    	- `storeOperatorState` -> `snapshotState`
    	- `restoreOperatorState` -> `restoreState`
    
    ```
    	public interface CheckpointedOperator {
    
    		/**
    		 * This method is called when state should be stored for a checkpoint. The state can be registered and written to
    		 * the provided state store.
    		 */
    		void snapshotState(long checkpointId, OperatorStateStore stateStore) throws Exception;
    
    		/**
    		 * This method is called when state should be restored from a checkpoint. The state can be obtained from the
    		 * provided state store.
    		 */
    		void restoreState(OperatorStateStore stateStore) throws Exception;
    	}
    ```
    
    ## 5) Rename: StateRepartitioner -> OperatorStateRepartitioner
    ```
    	/**
    	 * Interface that allows to implement different strategies for repartitioning of operator state as parallelism changes.
    	 */
    	public interface OperatorStateRepartitioner {
    
    		List<Collection<OperatorStateHandle>> repartitionOperatorState(
    				List<OperatorStateHandle> previousParallelSubtaskStates,
    				int parallelism);
    	}
    ```
    
    ## 6) Add new interface that allows user-friendly checkpointing code for simple cases that to not require custorm serializer
    ```
    	/**
    	 * Simplified interface as adapter to the more complex CheckpointedOperator
    	 */
    	public interface ListCheckpointed<T extends Serializable> {
    
    		List<T> snapshotState(long checkpointId) throws Exception;
    
    		void restoreState(List<T> state) throws Exception;
    	}
    ```
    
    ## 7) OperatorStateBackend lifecycle
    
    Another point that we might want to discuss is the life cycle of `OperatorStateBackend`. Currently, a new backend is created (+restored) for each invocation of the methods in `CheckpointedOperator`. This always provides a clean backend to take the operator state for a snapshot. I wonder if it could make sense to create `OperatorStateBackend` just once for each `AbstractStreamOperator`, similar to the KeyedStateBackend. This would give users the option to actually keep operator state only in the `OperatorStateBackend`. However, we need a way to signal that all state must be passed to the backend before a snapshot. For example, large operator states could be managed in RocksDB this way, and we could provide more proxy collections (currently we only support a list of substates) over time.
    
    What do you think @aljoscha @StephanEwen ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2512: [FLINK-4379] Rescalable non-partitioned state

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/2512
  
    I think we should also update the documentation as part of this PR (there's a page dedicated to working with state)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2512: [FLINK-4379] Rescalable non-partitioned state

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2512
  
    +1, I like the changes.
    
    Concerning (7) - I think creating the state backend once sounds good. It is more in line with the keyed state backend.
    
    Concerning Aljoscha's comments: Removing SnapshotProvider makes sense, if it is only an internal interface and never exposed to the user.
    
    Conderning the name `OperatorState` - I actually like the name. It is state that exists in the instance of an operator, assuming that functions are also (special cases of) operators.
    I would add to this pull request that it removes the deprecated `OperatorState` interface (superclass of `ValueState`).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2512: [FLINK-4379] Rescalable non-partitioned state

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2512


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2512: [FLINK-4379] Rescalable non-partitioned state

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/2512
  
    Please review @tillrohrmann @StephanEwen


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2512: [FLINK-4379] Rescalable non-partitioned state

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2512
  
    Had an in-depth code walkthrough with @StefanRRichter - comments are addresses as additional commits.
    
    Comments:
      - Use `StateDescriptor` as well for operator state
      - For the `snapshotState()` and `prepareSnapshot()` calls, the checkpoint timestamp should be passed as well.
      - The Kafka Consumer should use the same state as the `snapshotState()` interface, so as to allow for backwards compatible checkpoints/savepoints.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---