You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/02/03 14:16:13 UTC

[flink] 08/10: [FLINK-14495][docs] Add documentation for memory control of RocksDB state backend

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 45f534d698b8993e3c2375612200c4f429ddc25a
Author: Yun Tang <my...@live.com>
AuthorDate: Mon Dec 9 21:17:49 2019 +0800

    [FLINK-14495][docs] Add documentation for memory control of RocksDB state backend
---
 docs/ops/state/large_state_tuning.md    | 76 ++++++++++++++++++++++++++++++---
 docs/ops/state/large_state_tuning.zh.md | 76 ++++++++++++++++++++++++++++++---
 docs/ops/state/state_backends.md        |  2 +
 docs/ops/state/state_backends.zh.md     |  2 +
 4 files changed, 142 insertions(+), 14 deletions(-)

diff --git a/docs/ops/state/large_state_tuning.md b/docs/ops/state/large_state_tuning.md
index 66c440b..e76a835 100644
--- a/docs/ops/state/large_state_tuning.md
+++ b/docs/ops/state/large_state_tuning.md
@@ -122,7 +122,7 @@ Unfortunately, RocksDB's performance can vary with configuration, and there is l
 RocksDB properly. For example, the default configuration is tailored towards SSDs and performs suboptimal
 on spinning disks.
 
-**Incremental Checkpoints**
+### Incremental Checkpoints
 
 Incremental checkpoints can dramatically reduce the checkpointing time in comparison to full checkpoints, at the cost of a (potentially) longer
 recovery time. The core idea is that incremental checkpoints only record all changes to the previous completed checkpoint, instead of
@@ -138,7 +138,7 @@ by default. To enable this feature, users can instantiate a `RocksDBStateBackend
         new RocksDBStateBackend(filebackend, true);
 {% endhighlight %}
 
-**RocksDB Timers**
+### RocksDB Timers
 
 For RocksDB, a user can chose whether timers are stored on the heap or inside RocksDB (default). Heap-based timers can have a better performance for smaller numbers of
 timers, while storing timers inside RocksDB offers higher scalability as the number of timers in RocksDB can exceed the available main memory (spilling to disk).
@@ -149,7 +149,7 @@ Possible choices are `heap` (to store timers on the heap, default) and `rocksdb`
 <span class="label label-info">Note</span> *The combination RocksDB state backend with heap-based timers currently does NOT support asynchronous snapshots for the timers state.
 Other state like keyed state is still snapshotted asynchronously. Please note that this is not a regression from previous versions and will be resolved with `FLINK-10026`.*
 
-**Predefined Options**
+### Predefined Options
 
 Flink provides some predefined collections of option for RocksDB for different settings, and there existed two ways
 to pass these predefined options to RocksDB:
@@ -162,7 +162,7 @@ found a set of options that work well and seem representative for certain worklo
 
 <span class="label label-info">Note</span> Predefined options which set programmatically would override the one configured via `flink-conf.yaml`.
 
-**Passing Options Factory to RocksDB**
+### Passing Options Factory to RocksDB
 
 There existed two ways to pass options factory to RocksDB in Flink:
 
@@ -172,19 +172,20 @@ There existed two ways to pass options factory to RocksDB in Flink:
 
     {% highlight java %}
 
-    public class MyOptionsFactory implements ConfigurableOptionsFactory {
+    public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory {
 
         private static final long DEFAULT_SIZE = 256 * 1024 * 1024;  // 256 MB
         private long blockCacheSize = DEFAULT_SIZE;
 
         @Override
-        public DBOptions createDBOptions(DBOptions currentOptions) {
+        public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
             return currentOptions.setIncreaseParallelism(4)
                    .setUseFsync(false);
         }
 
         @Override
-        public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) {
+        public ColumnFamilyOptions createColumnOptions(
+            ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
             return currentOptions.setTableFormatConfig(
                 new BlockBasedTableConfig()
                     .setBlockCacheSize(blockCacheSize)
@@ -210,6 +211,67 @@ and not from the JVM. Any memory you assign to RocksDB will have to be accounted
 of the TaskManagers by the same amount. Not doing that may result in YARN/Mesos/etc terminating the JVM processes for
 allocating more memory than configured.
 
+### Bounding RocksDB Memory Usage
+
+RocksDB allocates native memory outside of the JVM, which could lead the process to exceed the total memory budget.
+This can be especially problematic in containerized environments such as Kubernetes that kill processes who exceed their memory budgets.
+
+Flink limit total memory usage of RocksDB instance(s) per slot by leveraging shareable [cache](https://github.com/facebook/rocksdb/wiki/Block-Cache)
+and [write buffer manager](https://github.com/facebook/rocksdb/wiki/Write-Buffer-Manager) among all instances in a single slot by default.
+The shared cache will place an upper limit on the [three components](https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB) that use the majority of memory
+when RocksDB is deployed as a state backend: block cache, index and bloom filters, and MemTables. 
+
+This feature is enabled by default along with managed memory. Flink will use the managed memory budget as the per-slot memory limit for RocksDB state backend(s).
+
+Flink also provides two parameters to tune the memory fraction of MemTable and index & filters along with the bounding RocksDB memory usage feature:
+  - `state.backend.rocksdb.memory.write-buffer-ratio`, by default `0.5`, which means 50% of the given memory would be used by write buffer manager.
+  - `state.backend.rocksdb.memory.high-prio-pool-ratio`, by default `0.1`, which means 10% of the given memory would be set as high priority for index and filters in shared block cache.
+  We strongly suggest not to set this to zero, to prevent index and filters from competing against data blocks for staying in cache and causing performance issues.
+  Moreover, the L0 level filter and index are pinned into the cache by default to mitigate performance problems,
+  more details please refer to the [RocksDB-documentation](https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-filter-and-compression-dictionary-blocks).
+
+<span class="label label-info">Note</span> When bounded RocksDB memory usage is enabled by default,
+the shared `cache` and `write buffer manager` will override customized settings of block cache and write buffer via `PredefinedOptions` and `OptionsFactory`.
+
+*Experts only*: To control memory manually instead of using managed memory, user can set `state.backend.rocksdb.memory.managed` as `false` and control via `ColumnFamilyOptions`.
+Or to save some manual calculation, through the `state.backend.rocksdb.memory.fixed-per-slot` option which will override `state.backend.rocksdb.memory.managed` when configured.
+With the later method, please tune down `taskmanager.memory.managed.size` or `taskmanager.memory.managed.fraction` to `0` 
+and increase `taskmanager.memory.task.off-heap.size` by "`taskmanager.numberOfTaskSlots` * `state.backend.rocksdb.memory.fixed-per-slot`" accordingly.
+
+#### Tune performance when bounding RocksDB memory usage.
+
+There might existed performance regression compared with previous no-memory-limit case if you have too many states per slot.
+- If you observed this behavior and not running jobs in containerized environment or don't care about the over-limit memory usage.
+The easiest way to wipe out the performance regression is to disable memory bound for RocksDB, e.g. turn `state.backend.rocksdb.memory.managed` as `false`.
+Moreover, please refer to [memory configuration migration guide](WIP) to know how to keep backward compatibility to previous memory configuration.
+- Otherwise you need to increase the upper memory for RocksDB by tuning up `taskmanager.memory.managed.size` or `taskmanager.memory.managed.fraction`, or increasing the total memory for task manager.
+
+*Experts only*: Apart from increasing total memory, user could also tune RocksDB options (e.g. arena block size, max background flush threads, etc.) via `RocksDBOptionsFactory`:
+
+{% highlight java %}
+public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory {
+
+    @Override
+    public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
+        // increase the max background flush threads when we have many states in one operator,
+        // which means we would have many column families in one DB instance.
+        return currentOptions.setMaxBackgroundFlushes(4);
+    }
+
+    @Override
+    public ColumnFamilyOptions createColumnOptions(
+        ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
+        // decrease the arena block size from default 8MB to 1MB. 
+        return currentOptions.setArenaBlockSize(1024 * 1024);
+    }
+
+    @Override
+    public OptionsFactory configure(Configuration configuration) {
+        return this;
+    }
+}
+{% endhighlight %}
+
 ## Capacity Planning
 
 This section discusses how to decide how many resources should be used for a Flink job to run reliably.
diff --git a/docs/ops/state/large_state_tuning.zh.md b/docs/ops/state/large_state_tuning.zh.md
index b29fee5..f3a7964 100644
--- a/docs/ops/state/large_state_tuning.zh.md
+++ b/docs/ops/state/large_state_tuning.zh.md
@@ -122,7 +122,7 @@ Unfortunately, RocksDB's performance can vary with configuration, and there is l
 RocksDB properly. For example, the default configuration is tailored towards SSDs and performs suboptimal
 on spinning disks.
 
-**Incremental Checkpoints**
+### Incremental Checkpoints
 
 Incremental checkpoints can dramatically reduce the checkpointing time in comparison to full checkpoints, at the cost of a (potentially) longer
 recovery time. The core idea is that incremental checkpoints only record all changes to the previous completed checkpoint, instead of
@@ -138,7 +138,7 @@ by default. To enable this feature, users can instantiate a `RocksDBStateBackend
         new RocksDBStateBackend(filebackend, true);
 {% endhighlight %}
 
-**RocksDB Timers**
+### RocksDB Timers
 
 For RocksDB, a user can chose whether timers are stored on the heap or inside RocksDB (default). Heap-based timers can have a better performance for smaller numbers of
 timers, while storing timers inside RocksDB offers higher scalability as the number of timers in RocksDB can exceed the available main memory (spilling to disk).
@@ -149,7 +149,7 @@ Possible choices are `heap` (to store timers on the heap, default) and `rocksdb`
 <span class="label label-info">Note</span> *The combination RocksDB state backend with heap-based timers currently does NOT support asynchronous snapshots for the timers state.
 Other state like keyed state is still snapshotted asynchronously. Please note that this is not a regression from previous versions and will be resolved with `FLINK-10026`.*
 
-**Predefined Options**
+### Predefined Options
 
 Flink provides some predefined collections of option for RocksDB for different settings, and there existed two ways
 to pass these predefined options to RocksDB:
@@ -162,7 +162,7 @@ found a set of options that work well and seem representative for certain worklo
 
 <span class="label label-info">Note</span> Predefined options which set programmatically would override the one configured via `flink-conf.yaml`.
 
-**Passing Options Factory to RocksDB**
+### Passing Options Factory to RocksDB
 
 There existed two ways to pass options factory to RocksDB in Flink:
 
@@ -172,19 +172,20 @@ There existed two ways to pass options factory to RocksDB in Flink:
 
     {% highlight java %}
 
-    public class MyOptionsFactory implements ConfigurableOptionsFactory {
+    public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory {
 
         private static final long DEFAULT_SIZE = 256 * 1024 * 1024;  // 256 MB
         private long blockCacheSize = DEFAULT_SIZE;
 
         @Override
-        public DBOptions createDBOptions(DBOptions currentOptions) {
+        public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
             return currentOptions.setIncreaseParallelism(4)
                    .setUseFsync(false);
         }
 
         @Override
-        public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) {
+        public ColumnFamilyOptions createColumnOptions(
+            ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
             return currentOptions.setTableFormatConfig(
                 new BlockBasedTableConfig()
                     .setBlockCacheSize(blockCacheSize)
@@ -210,6 +211,67 @@ and not from the JVM. Any memory you assign to RocksDB will have to be accounted
 of the TaskManagers by the same amount. Not doing that may result in YARN/Mesos/etc terminating the JVM processes for
 allocating more memory than configured.
 
+### Bounding RocksDB Memory Usage
+
+RocksDB allocates native memory outside of the JVM, which could lead the process to exceed the total memory budget.
+This can be especially problematic in containerized environments such as Kubernetes that kill processes who exceed their memory budgets.
+
+Flink limit total memory usage of RocksDB instance(s) per slot by leveraging shareable [cache](https://github.com/facebook/rocksdb/wiki/Block-Cache)
+and [write buffer manager](https://github.com/facebook/rocksdb/wiki/Write-Buffer-Manager) among all instances in a single slot by default.
+The shared cache will place an upper limit on the [three components](https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB) that use the majority of memory
+when RocksDB is deployed as a state backend: block cache, index and bloom filters, and MemTables. 
+
+This feature is enabled by default along with managed memory. Flink will use the managed memory budget as the per-slot memory limit for RocksDB state backend(s).
+
+Flink also provides two parameters to tune the memory fraction of MemTable and index & filters along with the bounding RocksDB memory usage feature:
+  - `state.backend.rocksdb.memory.write-buffer-ratio`, by default `0.5`, which means 50% of the given memory would be used by write buffer manager.
+  - `state.backend.rocksdb.memory.high-prio-pool-ratio`, by default `0.1`, which means 10% of the given memory would be set as high priority for index and filters in shared block cache.
+  We strongly suggest not to set this to zero, to prevent index and filters from competing against data blocks for staying in cache and causing performance issues.
+  Moreover, the L0 level filter and index are pinned into the cache by default to mitigate performance problems,
+  more details please refer to the [RocksDB-documentation](https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-filter-and-compression-dictionary-blocks).
+
+<span class="label label-info">Note</span> When bounded RocksDB memory usage is enabled by default,
+the shared `cache` and `write buffer manager` will override customized settings of block cache and write buffer via `PredefinedOptions` and `OptionsFactory`.
+
+*Experts only*: To control memory manually instead of using managed memory, user can set `state.backend.rocksdb.memory.managed` as `false` and control via `ColumnFamilyOptions`.
+Or to save some manual calculation, through the `state.backend.rocksdb.memory.fixed-per-slot` option which will override `state.backend.rocksdb.memory.managed` when configured.
+With the later method, please tune down `taskmanager.memory.managed.size` or `taskmanager.memory.managed.fraction` to `0` 
+and increase `taskmanager.memory.task.off-heap.size` by "`taskmanager.numberOfTaskSlots` * `state.backend.rocksdb.memory.fixed-per-slot`" accordingly.
+
+#### Tune performance when bounding RocksDB memory usage.
+
+There might existed performance regression compared with previous no-memory-limit case if you have too many states per slot.
+- If you observed this behavior and not running jobs in containerized environment or don't care about the over-limit memory usage.
+The easiest way to wipe out the performance regression is to disable memory bound for RocksDB, e.g. turn `state.backend.rocksdb.memory.managed` as `false`.
+Moreover, please refer to [memory configuration migration guide](WIP) to know how to keep backward compatibility to previous memory configuration.
+- Otherwise you need to increase the upper memory for RocksDB by tuning up `taskmanager.memory.managed.size` or `taskmanager.memory.managed.fraction`, or increasing the total memory for task manager.
+
+*Experts only*: Apart from increasing total memory, user could also tune RocksDB options (e.g. arena block size, max background flush threads, etc.) via `RocksDBOptionsFactory`:
+
+{% highlight java %}
+public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory {
+
+    @Override
+    public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
+        // increase the max background flush threads when we have many states in one operator,
+        // which means we would have many column families in one DB instance.
+        return currentOptions.setMaxBackgroundFlushes(4);
+    }
+
+    @Override
+    public ColumnFamilyOptions createColumnOptions(
+        ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
+        // decrease the arena block size from default 8MB to 1MB. 
+        return currentOptions.setArenaBlockSize(1024 * 1024);
+    }
+
+    @Override
+    public OptionsFactory configure(Configuration configuration) {
+        return this;
+    }
+}
+{% endhighlight %}
+
 ## Capacity Planning
 
 This section discusses how to decide how many resources should be used for a Flink job to run reliably.
diff --git a/docs/ops/state/state_backends.md b/docs/ops/state/state_backends.md
index cd0a041d..1264aec 100644
--- a/docs/ops/state/state_backends.md
+++ b/docs/ops/state/state_backends.md
@@ -123,6 +123,8 @@ RocksDBStateBackend is currently the only backend that offers incremental checkp
 
 Certain RocksDB native metrics are available but disabled by default, you can find full documentation [here]({{ site.baseurl }}/ops/config.html#rocksdb-native-metrics)
 
+The total memory amount of RocksDB instance(s) per slot can also be bounded, please refer to documentation [here](large_state_tuning.html#bounding-rocksdb-memory-usage) for details.
+
 ## Configuring a State Backend
 
 The default state backend, if you specify nothing, is the jobmanager. If you wish to establish a different default for all jobs on your cluster, you can do so by defining a new default state backend in **flink-conf.yaml**. The default state backend can be overridden on a per-job basis, as shown below.
diff --git a/docs/ops/state/state_backends.zh.md b/docs/ops/state/state_backends.zh.md
index 9960d32..eeaf247 100644
--- a/docs/ops/state/state_backends.zh.md
+++ b/docs/ops/state/state_backends.zh.md
@@ -119,6 +119,8 @@ RocksDBStateBackend 是目前唯一支持增量 CheckPoint 的 State Backend (
 
 可以使用一些 RocksDB 的本地指标(metrics),但默认是关闭的。你能在 [这里]({{ site.baseurl }}/zh/ops/config.html#rocksdb-native-metrics) 找到关于 RocksDB 本地指标的文档。
 
+The total memory amount of RocksDB instance(s) per slot can also be bounded, please refer to documentation [here](large_state_tuning.html#bounding-rocksdb-memory-usage) for details.
+
 ## 设置 State Backend
 
 如果没有明确指定,将使用 jobmanager 做为默认的 state backend。你能在 **flink-conf.yaml** 中为所有 Job 设置其他默认的 State Backend。