You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StefanRRichter <gi...@git.apache.org> on 2018/06/29 08:47:15 UTC
[GitHub] flink pull request #6227: Heap abstractions rocks
GitHub user StefanRRichter opened a pull request:
https://github.com/apache/flink/pull/6227
Heap abstractions rocks
## What is the purpose of the change
This PR is another step towards integrating the timer state with the keyed state backends.
First, the PR generalizes the data structure `InternalTimerHeap` to `InternalPriorityQueue` so that the functionality of a heap-set-organized state is decoupled from storing timers. The main reason for this is that state/backend related code lives in flink-runtime and timers are a concept from flink-streaming.
Second, the PR also introduced an implementation of `InternalPriorityQueue` with set semantics (i.e. the data structure we require to manage timers) that is based on RocksDB. State in RocksDB is always partitioned into key-groups, so the general idea is to organize the implementation as a heap-of-heaps, where each sub-heap represents elements from exactly one key-group, that merges by priority over the key-group boundaries. The implementation reuses the in-memory implementation of `InternalPriorityQueue` (without set-properties) as the super-heap that holds the sub-heaps. Further more each sub-heap is an instance of `CachingInternalPriorityQueueSet`, consisting of a "fast", "small" cache (`OrderedSetCache`) and a "slow", "unbounded" store (`OrderedSetStore`), currently applying simple write-through synchronization between cache and store. In the current implementation, the cache is based on a an AVL-Tree and restricted in capacity. The store is backed by a RocksDB column family.
We utilize caching to reduced read-accesses to RocksDB.
Please note that the RocksDB implementation is currently not yet integrated with the timer service or the backend. This will happen in the next steps.
## Brief change log
- Refactored `InternalTimerHeap` to decouple it from timers, moved the data structures from flink-streaming to flink-runtime (-> `InternalPriorityQueue`).
- Split the data-structure into a hierarchy, a heap without set-semantics (`HeapPriorityQueue`) and a heap extended with set-semantics (`HeapPriorityQueueSet`).
- Introduced an implementation of RocksDB-based `InternalPriorityQueue` with set-semantics. Starting point is `KeyGroupPartitionedPriorityQueue`. This class uses a `HeapPriorityQueue` of `CachingInternalPriorityQueueSet` elements that each contains elements for exactly one key-group (heap-of-heaps). For RocksDB, we configure each `CachingInternalPriorityQueueSet` to use a `TreeOrderedSetCache` and a `RocksDBOrderedStore`.
## Verifying this change
I added dedicated tests for all data structures.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (yes, fastutil)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (yes)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
- The S3 file system connector: (no)
## Documentation
- Does this pull request introduce a new feature? (no)
- If yes, how is the feature documented? (not applicable)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/StefanRRichter/flink heapAbstractionsRocks
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/6227.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #6227
----
commit b3261a15bdf15207f50e2832a048fb3c84b8f642
Author: Stefan Richter <s....@...>
Date: 2018-06-19T08:01:30Z
Introduce MAX_ARRAY_SIZE as general constant
commit b5522ba34b2166e698a1695390a6d1fc4c671bb9
Author: Stefan Richter <s....@...>
Date: 2018-06-18T12:38:01Z
Generalization of timer queue to a queue(set) that is no longer coupled to timers and implementation for RocksDB.
----
---
[GitHub] flink pull request #6227: [FLINK-9491] Implement timer data structure based ...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter closed the pull request at:
https://github.com/apache/flink/pull/6227
---