You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by aljoscha <gi...@git.apache.org> on 2016/01/29 16:07:38 UTC

[GitHub] flink pull request: Enhance Partitioned State and use it in Window...

GitHub user aljoscha opened a pull request:

    https://github.com/apache/flink/pull/1562

    Enhance Partitioned State and use it in WindowOperator

    The commits in this are self-contained. The first one enhances the partitioned state such that it can be used by the WindowOperator. The second commit adds the required changes in WindowOperator. The third commit adds a State Backend based on RocksDB. This means that window operator can not run on different types of state backends transparently.
    
    @StephanEwen has some things that he wants to change, those will be added here (I think). I'm opening this now so that people can have a look at it.
    
    ## [FLINK-3201] Enhance Partitioned State Interface with State Types
    Add new state types ValueState, ListState and ReducingState, where
    ListState and ReducingState derive from interface MergingState.
    
    ValueState behaves exactly the same as OperatorState. MergingState is a
    stateful list to which elements can be added and for which the elements
    that it contains can be obtained. If using a ListState the list of
    elements is actually kept, for a ReducingState a reduce function is used
    to combine all added elements into one. To create a ValueState the user
    passes a ValueStateIdentifier to
    StreamingRuntimeContext.getPartitionedState() while they would pass a
    ListStateIdentifier or ReducingStateIdentifier for the other state
    types.
    
    This change is necessary to give the system more information about the
    nature of the operator state. We want this to be able to do incremental
    snapshots. This would not be possible, for example, if the user had a
    List as a state. Inside OperatorState this list would be opaque and
    Flink could not create good incremental snapshots.
    
    This also refactors the StateBackend. Before, the logic for partitioned
    state was spread out over StreamingRuntimeContext,
    AbstractStreamOperator and StateBackend. Now it is consolidated in
    StateBackend.
    
    This also adds support for partitioned state in two-input operators.
    
    ## [FLINK-3200] Use Partitioned State in WindowOperator
    
    This changes window operator to use the new partitioned state
    abstraction for keeping window contents instead of custom internal
    state and the checkpointed interface.
    
    For now, timers are still kept as custom checkpointed state, however.
    
    WindowOperator now expects a StateIdentifier for MergingState, this can
    either be for ReducingState or ListState but WindowOperator is agnostic
    to the type of State. Also the signature of WindowFunction is changed to
    include the type of intermediate input. For example, if a ReducingState
    is used the input of the WindowFunction is T (where T is the input
    type). If using a ListState the input of the WindowFunction would be of
    type Iterable[T].

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/aljoscha/flink window-on-state

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1562.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1562
    
----
commit dc440ba4d97cecd297e432f14342dba5382cab50
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2016-01-25T11:33:51Z

    [FLINK-3201] Enhance Partitioned State Interface with State Types
    
    Add new state types ValueState, ListState and ReducingState, where
    ListState and ReducingState derive from interface MergingState.
    
    ValueState behaves exactly the same as OperatorState. MergingState is a
    stateful list to which elements can be added and for which the elements
    that it contains can be obtained. If using a ListState the list of
    elements is actually kept, for a ReducingState a reduce function is used
    to combine all added elements into one. To create a ValueState the user
    passes a ValueStateIdentifier to
    StreamingRuntimeContext.getPartitionedState() while they would pass a
    ListStateIdentifier or ReducingStateIdentifier for the other state
    types.
    
    This change is necessary to give the system more information about the
    nature of the operator state. We want this to be able to do incremental
    snapshots. This would not be possible, for example, if the user had a
    List as a state. Inside OperatorState this list would be opaque and
    Flink could not create good incremental snapshots.
    
    This also refactors the StateBackend. Before, the logic for partitioned
    state was spread out over StreamingRuntimeContext,
    AbstractStreamOperator and StateBackend. Now it is consolidated in
    StateBackend.
    
    This also adds support for partitioned state in two-input operators.

commit 865723e3ff0133ba9d921907c298c3545fdfe32c
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2016-01-25T11:34:05Z

    [FLINK-3200] Use Partitioned State in WindowOperator
    
    This changes window operator to use the new partitioned state
    abstraction for keeping window contents instead of custom internal
    state and the checkpointed interface.
    
    For now, timers are still kept as custom checkpointed state, however.
    
    WindowOperator now expects a StateIdentifier for MergingState, this can
    either be for ReducingState or ListState but WindowOperator is agnostic
    to the type of State. Also the signature of WindowFunction is changed to
    include the type of intermediate input. For example, if a ReducingState
    is used the input of the WindowFunction is T (where T is the input
    type). If using a ListState the input of the WindowFunction would be of
    type Iterable[T].

commit 1034a02380652ef5184e0094a41ef073c7c9b4fd
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2016-01-21T09:56:47Z

    [FLINK-3278] Add Partitioned State Backend Based on RocksDB

commit 13d1f541a48f82b1a10854addbdd5ea9bde7b079
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2016-01-28T17:18:26Z

    Move RocksDB backup to external processes

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Enhance Partitioned State and use it in Window...

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/flink/pull/1562#issuecomment-177611308
  
    Hey,
    First of all, great work I am looking forward to having this is :)
    
    I think it would be good if we added default implementations of the List, Reducing state based on the ValueState and those would be the default implementations returned by the AbstractStateBackend. I think this should go in before merging this.
    
    Now that are approaching the release we should make the interfaces clean and future additions easy and I believe this is necessary for this. I think many backends will not have a very efficient way of implementing List and Reduce states and they will naturally fall back to the default implementation.
    
    What do you think?
    
    Gyula


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Enhance Partitioned State and use it in Window...

Posted by wenlonglwl <gi...@git.apache.org>.
Github user wenlonglwl commented on the pull request:

    https://github.com/apache/flink/pull/1562#issuecomment-191081091
  
    3q, @aljoscha ~ I agree with @StephanEwen, your purpose is to prevent user from using partitioned state in windowed reduce function, but removing rich functions do much more side-effection~ 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Enhance Partitioned State and use it in Window...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1562#issuecomment-178195987
  
    Rebased and extended this pull request in #1571


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Enhance Partitioned State and use it in Window...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1562#issuecomment-177839471
  
    @gyfora you're right, I will add default implementations for ListState and ReducingState and use them in the DbStateBackend. For RocksDB there are custom implementations, for example in ListState I can append a value to the state with one call instead of a get-update-put operation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Enhance Partitioned State and use it in Window...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha closed the pull request at:

    https://github.com/apache/flink/pull/1562


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Enhance Partitioned State and use it in Window...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1562#issuecomment-178111578
  
    @StephanEwen @gyfora I have the changes reflected in this PR already.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Enhance Partitioned State and use it in Window...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1562#issuecomment-178146614
  
    Have rebased this onto the current master.
    
    One thing I'd like to change is to move the HDFS copy process utils from `flink-core` to `flink-runtime`.
    Ideally, we keep `flink-core` free of any Hadoop-dependent code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Enhance Partitioned State and use it in Window...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1562#issuecomment-191683975
  
    Okay, that sounds like a design flaw in hiding the aggregating functions completely from the window operator in the state. I think the operator should be aware of the additional function, if only to open()/close() it.
    
    The fact that a RichReduceFunction cannot be used at all seems like a huge limitation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Enhance Partitioned State and use it in Window...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1562#issuecomment-190649095
  
    Hi @wenlonglwl, the reason for this were mostly practical concerns. We want to use our partitioned state abstraction for the window state because that makes it easy to change the state backend depending on the use case. So, when using a ReduceFunction this is actually put into a ReducingState that keeps an aggregated value using that ReduceFunction. Having RichFunctions in inside state would require a lot of overhead. First the function would have to be copied for every different key because the function can keep state internally, second, it would require wiring all the additional RichFunction calls (such as open(), close(), the RuntimeContext) through to the state abstraction.
    
    Could you maybe use the `WindowedStream.apply(ReduceFunction, WindowFunction)` method, this allows you to incrementally aggregate the elements in the window and then get the final result in the WindowFunction once the window is emitted. This WindowFunction can be a RichFunction and can also keep state.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Enhance Partitioned State and use it in Window...

Posted by wenlonglwl <gi...@git.apache.org>.
Github user wenlonglwl commented on the pull request:

    https://github.com/apache/flink/pull/1562#issuecomment-190511772
  
    Hi,   @aljoscha , I am confused about the change you make in this issue, that makes RichFunction not supported in the aggregation function of the windowed stream, what is useful for customized initialization of complex functions. Can you explain me  the reason? Thank you for your time in forward ~


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Enhance Partitioned State and use it in Window...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1562#issuecomment-178130142
  
    Okay, nice, didn't look at the code after the changes again.
    
    +1 to merge this then


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Enhance Partitioned State and use it in Window...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1562#issuecomment-190769526
  
    I am wondering whether we cannot allow rich functions in the windows still. The window operator would need to call `open()`, and `close()`rather than the state. What would be the downside of that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Enhance Partitioned State and use it in Window...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1562#issuecomment-178099623
  
    I think this is in quite good shape.
    
    @gyfora's comment makes sense, +1 for such a default implementation.
    
    The changes I am making are based on this, but I would like to go ahead with merging this independently. Should we also apply the changes suggested by Gyula after merging this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Enhance Partitioned State and use it in Window...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1562#issuecomment-191681857
  
    It is certainly possible to find some way of doing it but it is not straightforward. So for now I wanted to keep it simple until we figure out a good way to do it.
    
    @StephanEwen The window operator cannot call the rich methods since it doesn't know that there is one in there. The rich method lifecycle has to somehow be managed by the StateBackend/State. So then you need to figure out when to call the methods. When creating the StateBackend it's not possible since the state is not there yet. So you maybe call it when the state is first created. Do you call close when you clean out the state for all keys, since then the state is technically not there anymore. Then, do you call open again when it is created again? Also, what about the state methods on RuntimeContext. I don't think you can use ordinary RichFunctions there. (Same probably goes for most of the other methods on RuntimeContext.)
    
    I think we need to find another way for these methods to have some kind of lifecycle management but it requires some thought.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Enhance Partitioned State and use it in Window...

Posted by gyfora <gi...@git.apache.org>.
Github user gyfora commented on the pull request:

    https://github.com/apache/flink/pull/1562#issuecomment-177611887
  
    This would also remove a lot of code duplication for the DbStateBackend and would probably also make the RocksDb backend a little cleaner :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---