You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/07/06 16:30:00 UTC

[jira] [Commented] (FLINK-9486) Introduce TimerState in keyed state backend

    [ https://issues.apache.org/jira/browse/FLINK-9486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16535082#comment-16535082 ] 

ASF GitHub Bot commented on FLINK-9486:
---------------------------------------

GitHub user StefanRRichter opened a pull request:

    https://github.com/apache/flink/pull/6276

    [FLINK-9486] Introduce TimerState in keyed state backend

    ## What is the purpose of the change
    
    This PR integrates `InternalTimerQueue` with keyed state backends (Heap and RocksDB), so that we can use the RocksDB-based version in the job for the first time. 
    
    We introduce the interface `KeyGroupPartitionedPriorityQueue` as an easy adapter to existing snapshotting code. This can probably be removed once the queues are fully integrated with the backend's snapshotting, in a followup PR. 
    
    The PR also addresses an issue with the `TreeOrderedCache` that requires a "proper" `Comparator` (implemented in `TieBreakingPriorityComparator`) and we introduce `PriorityComparator` to give more emphasize to this difference. `TieBreakingPriorityComparator` is likely to also go away once we come up with an improved caching that is not simply based on a tree.
    
    We introduce `PriorityQueueSetFactory` to the keyed state backends, and this is were the queues are build. The current implementation of RocksDB uses an additional RocksDB instance until we are fully integrated with backend snapshotting, because we are otherwise running into trouble with incremental snapshots.
    
    A configuration parameter is introduced to chose the implementation of queues for RocksDB, the default is still using the heap variant for now.
    
    Finally, we introduce an additional method for bulk polling in the `InternalTimerQueue` interface that opens up future optimizations.
    
    ## Verifying this change
    
    This change is already covered by existing tests, such as `AbstractEventTimeWindowCheckpointingITCase`, but you would currently need to activate it via 
    `RockDBBackendOptions.PRIORITY_QUEUE_STATE_TYPE`.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (yes, if activated)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StefanRRichter/flink integrateSetStateWithBackends

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6276.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6276
    
----
commit 0d8743e52a658876425b6cef03fef3fef2d09deb
Author: Stefan Richter <s....@...>
Date:   2018-07-04T11:43:49Z

    Remove read options from RocksDBOrderedSetStore

commit 84b1b36357322cf23d50396cbfa0725db95797ea
Author: Stefan Richter <s....@...>
Date:   2018-07-04T11:51:14Z

    Introduce (temporary?/visible for testing) KeyGroupPartitionedPriorityQueue interface to work with the existing snapshotting

commit 35e02705f6740854ae18a92b5a6dfbafd3201a8f
Author: Stefan Richter <s....@...>
Date:   2018-07-04T16:07:54Z

    Basic integration with backends / make Rocks timers work

commit 1294ac356162430cf9de86980de1d4a0154f33b8
Author: Stefan Richter <s....@...>
Date:   2018-07-05T16:46:34Z

    Introduce PriorityComparator and tie breaking variant as adapter to collections that require a comparator.
    
    This is required because the tree set that we use in the cache expects that Comparators are aligned with Object#equals

commit bfd3a12e77348a79c91656d80a7a67ece9825103
Author: Stefan Richter <s....@...>
Date:   2018-07-05T19:35:08Z

    Iterator directly from cache if no store-only elements.

commit fbf26f1f2efbe1e2029d09d297808e26e08b87d8
Author: Stefan Richter <s....@...>
Date:   2018-07-06T08:22:49Z

    Use a dedicated RocksDB instance for priority queue state. We can revert
    this once priority queue state is properly integrated with the
    snapshotting. Until then, we must isolate the priority queue state in
    a separate db or else incremental checkpoints will break.

commit 75cb05ab21e07eaed25e1cac048919f7f313b3f6
Author: Stefan Richter <s....@...>
Date:   2018-07-06T13:55:02Z

    Configuration part

commit 7a86e268072ec4ad9d9fae2fa8e852b66d4424a8
Author: Stefan Richter <s....@...>
Date:   2018-07-06T14:48:53Z

    Introduce bulk poll method in queue to open up future optimizations

----


> Introduce TimerState in keyed state backend
> -------------------------------------------
>
>                 Key: FLINK-9486
>                 URL: https://issues.apache.org/jira/browse/FLINK-9486
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.6.0
>
>
> This is the first implementation subtask.
> Goal of this PR is to introduce a timer state that is registered with the keyed state backend, similar to other forms of keyed state.
> For the {{HeapKeyedStateBackend}}, this state lives on the same level as the {{StateTable}} that hold other forms of keyed state, and the implementation is basically backed by {{InternalTimerHeap}}.
> For {{RocksDBKeyedStateBackend}}, in this first step, we also introduce this state, outside of RocksDB and based upon {{InternalTimerHeap}}. This is an intermediate state, and we will later also implement the alternative to store the timers inside a column families in RocksDB. However, by taking this step, we could also still offer the option to have RocksDB state with heap-based timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)