You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Roman Khachatryan (Jira)" <ji...@apache.org> on 2022/11/08 10:55:03 UTC

[jira] [Created] (FLINK-29928) Allow sharing (RocksDB) memory between slots

Roman Khachatryan created FLINK-29928:
-----------------------------------------

             Summary: Allow sharing (RocksDB) memory between slots
                 Key: FLINK-29928
                 URL: https://issues.apache.org/jira/browse/FLINK-29928
             Project: Flink
          Issue Type: New Feature
          Components: Runtime / Configuration, Runtime / State Backends, Runtime / Task
            Reporter: Roman Khachatryan
            Assignee: Roman Khachatryan
             Fix For: 1.17.0


h1. Background and motivation

RocksDB is one of the main consumers of off-heap memory, which it uses for BlockCache, MemTables, Indices and Bloom Filters.
Since 1.10 (FLINK-7289), it is possible to:
- share these objects among RocksDB instances of the same slot
- bound the total memory usage by all RocksDB instances of a TM

The memory is divided between the slots equally (unless using fine-grained resource control).
This is sub-optimal, if some slots contain more memory intensive tasks than the others.

The proposal is to widen the scope of sharing memory to TM so that it can be shared across all its RocksDB instances.
That would allow to reduce the overall memory consuption in exchange for resource isolation.

h1. Proposed changes

h2. Configuration
- introduce "taskmanager.memory.managed.shared-fraction" (0..1, default 0)
-- cluster-level (yaml only)
-- the non-shared memory will be used as it is now (exclusively per-slot)
- introduce "state.backend.memory.share-scope"
-- job-level (yaml and StateBackend)
-- possible values: NONE, SLOT, TASK_MANAGER
-- default: not set
-- override "state.backend.rocksdb.memory.fixed-per-slot" if both are set (but don't deprecate it, because it specifies the size)
- rely on the existing "state.backend.rocksdb.memory.managed" to decide whether the shared memory is managed or unmanaged
- when calculating TM-wise shared  memory, ignore "taskmanager.memory.managed.consumer-weights" because RocksDB is the only consumer so far
- similarly, exclude StateBackend from weights calculations, so other consumers (e.g. PYTHON) can better utilize exclusive slot memory
- use cluster-level or default configuration when creating TM-wise shared RocksDB objects, e.g.  "state.backend.rocksdb.memory.managed", "state.backend.rocksdb.memory.write-buffer-ratio"

h2. Example
{code}
taskmanager.memory.managed.size: 1gb
taskmanager.memory.managed.shared-fraction: .75 # all slots share 750Mb of shared managed memory
taskmanager.numberOfTaskSlots: 10               # each task slot gets 25Mb of exclusive managed memory
cluster.fine-grained-resource-management.enabled: false

job 1:
state.backend.memory.share-scope: TASK_MANAGER
state.backend.rocksdb.memory.managed: true

job 2:
state.backend.memory.share-scope: TASK_MANAGER
state.backend.rocksdb.memory.managed: true

job 3:
state.backend.memory.share-scope: SLOT
state.backend.rocksdb.memory.managed: true

job 4:
state.backend.memory.share-scope: TASK_MANAGER
state.backend.rocksdb.memory.managed: false

job 5:
state.backend.memory.share-scope: TASK_MANAGER
state.backend.rocksdb.memory.managed: false
{code}
Jobs 1 and 2 will use the same 750Mb of managed memory and will compete with each other.
Job 3 will only use exclusive slot memory (25mb per slot).
Jobs 4 and 5 will use the same 750Mb of unmanaged memory and will compete with each other.

Python code (or other consumers) will be able to use up to 25mb per slot in jobs 1,2,4,5.

h2. Creating and sharing RocksDB objects
Introduce sharedMemoryManager to TaskManager.
Then, similarly to the current slot-wise sharing:
- Memory manager manages OpaqueMemoryResource
- Creation of Cache object is done from the backend code on the first call
So flink-runtime doesn't have to depend on state backend.

h2. Class loading and resolution
RocksDB state backend is already a part of the distribution.
However, if a job also includes it then classloader.resolve-order should be set to parent-first to prevent conflicts.

h2. Lifecycle
The cache object should be destroyed on TM termnation; job or task completion should NOT close it.

h1. Testing
One way to test that the same RocksDB cache is used is via RocksDB metrics.

h1. Limitations
- classloader.resolve-order=child-first is not supported
- fine-grained-resource-management is not supported
- only RocksDB will be able to use TM-wise shared memory; other consumers may be adjusted later

cc: [~yunta], [~ym], [~liyu]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)