You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StefanRRichter <gi...@git.apache.org> on 2018/06/29 08:47:15 UTC

[GitHub] flink pull request #6227: Heap abstractions rocks

GitHub user StefanRRichter opened a pull request:

    https://github.com/apache/flink/pull/6227

    Heap abstractions rocks

    ## What is the purpose of the change
    
    This PR is another step towards integrating the timer state with the keyed state backends.
    
    First, the PR generalizes the data structure `InternalTimerHeap` to `InternalPriorityQueue` so that the functionality of a heap-set-organized state is decoupled from storing timers. The main reason for this is that state/backend related code lives in flink-runtime and timers are a concept from flink-streaming.
    
    Second, the PR also introduced an implementation of `InternalPriorityQueue` with set semantics (i.e. the data structure we require to manage timers) that is based on RocksDB. State in RocksDB is always partitioned into key-groups, so the general idea is to organize the implementation as a heap-of-heaps, where each sub-heap represents elements from exactly one key-group, that merges by priority over the key-group boundaries. The implementation reuses the in-memory implementation of `InternalPriorityQueue` (without set-properties) as the super-heap that holds the sub-heaps. Further more each sub-heap is an instance of `CachingInternalPriorityQueueSet`, consisting of a "fast", "small" cache (`OrderedSetCache`) and a "slow", "unbounded" store (`OrderedSetStore`), currently applying simple write-through synchronization between cache and store. In the current implementation, the cache is based on a an AVL-Tree and restricted in capacity. The store is backed by a RocksDB column family. 
 We utilize caching to reduced read-accesses to RocksDB.
    
    Please note that the RocksDB implementation is currently not yet integrated with the timer service or the backend. This will happen in the next steps.
    
    ## Brief change log
    
    - Refactored `InternalTimerHeap` to decouple it from timers, moved the data structures from flink-streaming to flink-runtime (-> `InternalPriorityQueue`).
    - Split the data-structure into a hierarchy, a heap without set-semantics (`HeapPriorityQueue`) and a heap extended with set-semantics (`HeapPriorityQueueSet`).
    - Introduced an implementation of RocksDB-based `InternalPriorityQueue` with set-semantics. Starting point is `KeyGroupPartitionedPriorityQueue`. This class uses a `HeapPriorityQueue` of `CachingInternalPriorityQueueSet` elements that each contains elements for exactly one key-group (heap-of-heaps). For RocksDB, we configure each `CachingInternalPriorityQueueSet` to use a `TreeOrderedSetCache` and a `RocksDBOrderedStore`.
    
    
    ## Verifying this change
    
    I added dedicated tests for all data structures.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes, fastutil)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (yes)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StefanRRichter/flink heapAbstractionsRocks

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6227.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6227
    
----
commit b3261a15bdf15207f50e2832a048fb3c84b8f642
Author: Stefan Richter <s....@...>
Date:   2018-06-19T08:01:30Z

    Introduce MAX_ARRAY_SIZE as general constant

commit b5522ba34b2166e698a1695390a6d1fc4c671bb9
Author: Stefan Richter <s....@...>
Date:   2018-06-18T12:38:01Z

    Generalization of timer queue to a queue(set) that is no longer coupled to timers and implementation for RocksDB.

----


---

[GitHub] flink pull request #6227: [FLINK-9491] Implement timer data structure based ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter closed the pull request at:

    https://github.com/apache/flink/pull/6227


---