You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/02/03 14:16:14 UTC

[flink] 09/10: [FLINK-14495][docs] Reorganize sections of RocksDB documentation

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9afcf31fdccae6f7df31e59cf314852cfb38afd6
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri Jan 31 23:28:04 2020 +0100

    [FLINK-14495][docs] Reorganize sections of RocksDB documentation
    
    This moves most in-depth RocksDB contents to te State Backends docs.
    The "tuning large state" docs only refer to actual tuning and trouble-shooting.
---
 docs/ops/state/large_state_tuning.md    | 152 +++++++-----------------------
 docs/ops/state/large_state_tuning.zh.md | 160 +++++++-------------------------
 docs/ops/state/state_backends.md        | 145 ++++++++++++++++++++++++++++-
 3 files changed, 207 insertions(+), 250 deletions(-)

diff --git a/docs/ops/state/large_state_tuning.md b/docs/ops/state/large_state_tuning.md
index e76a835..ba98338 100644
--- a/docs/ops/state/large_state_tuning.md
+++ b/docs/ops/state/large_state_tuning.md
@@ -118,160 +118,70 @@ Other state like keyed state is still snapshotted asynchronously. Please note th
 The state storage workhorse of many large scale Flink streaming applications is the *RocksDB State Backend*.
 The backend scales well beyond main memory and reliably stores large [keyed state](../../dev/stream/state/state.html).
 
-Unfortunately, RocksDB's performance can vary with configuration, and there is little documentation on how to tune
-RocksDB properly. For example, the default configuration is tailored towards SSDs and performs suboptimal
-on spinning disks.
+RocksDB's performance can vary with configuration, this section outlines some best-practices for tuning jobs that use the RocksDB State Backend.
 
 ### Incremental Checkpoints
 
-Incremental checkpoints can dramatically reduce the checkpointing time in comparison to full checkpoints, at the cost of a (potentially) longer
-recovery time. The core idea is that incremental checkpoints only record all changes to the previous completed checkpoint, instead of
-producing a full, self-contained backup of the state backend. Like this, incremental checkpoints build upon previous checkpoints. Flink leverages
-RocksDB's internal backup mechanism in a way that is self-consolidating over time. As a result, the incremental checkpoint history in Flink
-does not grow indefinitely, and old checkpoints are eventually subsumed and pruned automatically.
+When it comes to reducing the time that checkpoints take, activating incremental checkpoints should be one of the first considerations.
+Incremental checkpoints can dramatically reduce the checkpointing time in comparison to full checkpoints, because incremental checkpoints only record the changes compared to the previous completed checkpoint, instead of producing a full, self-contained backup of the state backend.
 
-While we strongly encourage the use of incremental checkpoints for large state, please note that this is a new feature and currently not enabled 
-by default. To enable this feature, users can instantiate a `RocksDBStateBackend` with the corresponding boolean flag in the constructor set to `true`, e.g.:
+See [Incremental Checkpoints in RocksDB]({{ site.baseurl }}/ops/state/state_backends.html#incremental-checkpoints) for more background information.
 
-{% highlight java %}
-    RocksDBStateBackend backend =
-        new RocksDBStateBackend(filebackend, true);
-{% endhighlight %}
+### Timers in RocksDB or on JVM Heap
 
-### RocksDB Timers
+Timers are stored in RocksDB by default, which is the more robust and scalable choice.
 
-For RocksDB, a user can chose whether timers are stored on the heap or inside RocksDB (default). Heap-based timers can have a better performance for smaller numbers of
-timers, while storing timers inside RocksDB offers higher scalability as the number of timers in RocksDB can exceed the available main memory (spilling to disk).
+When performance-tuning jobs that have few timers only (no windows, not using timers in ProcessFunction), putting those timers on the heap can increase performance.
+Use this feature carefully, as heap-based timers may increase checkpointing times and naturally cannot scale beyond memory.
 
-When using RockDB as state backend, the type of timer storage can be selected through Flink's configuration via option key `state.backend.rocksdb.timer-service.factory`.
-Possible choices are `heap` (to store timers on the heap, default) and `rocksdb` (to store timers in RocksDB).
+See [this section]({{ site.baseurl }}/ops/state/state_backends.html#timers-heap-vs-rocksdb) for details on how to configure heap-based timers.
 
-<span class="label label-info">Note</span> *The combination RocksDB state backend with heap-based timers currently does NOT support asynchronous snapshots for the timers state.
-Other state like keyed state is still snapshotted asynchronously. Please note that this is not a regression from previous versions and will be resolved with `FLINK-10026`.*
+### Tuning RocksDB Memory
 
-### Predefined Options
+The performance of the RocksDB State Backend much depends on the amount of memory that it has available. To increase performance, adding memory can help a lot, or adjusting to which functions memory goes.
 
-Flink provides some predefined collections of option for RocksDB for different settings, and there existed two ways
-to pass these predefined options to RocksDB:
-  - Configure the predefined options through `flink-conf.yaml` via option key `state.backend.rocksdb.predefined-options`.
-    The default value of this option is `DEFAULT` which means `PredefinedOptions.DEFAULT`.
-  - Set the predefined options programmatically, e.g. `RocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM)`.
+By default, the RocksDB State Backend uses Flink's managed memory budget for RocksDBs buffers and caches (`state.backend.rocksdb.memory.managed: true`). Please refer to the [RocksDB Memory Management]({{ site.baseurl }}/ops/state/state_backends.html#memory-management) for background on how that mechanism works.
 
-We expect to accumulate more such profiles over time. Feel free to contribute such predefined option profiles when you
-found a set of options that work well and seem representative for certain workloads.
+To tune memory-related performance issues, the following steps may be helpful:
 
-<span class="label label-info">Note</span> Predefined options which set programmatically would override the one configured via `flink-conf.yaml`.
+  - The first step to try and increase performance should be to increase the amount of managed memory. This usually improves the situation a lot, without opening up the complexity of tuning low-level RocksDB options.
+    
+    Especially with large container/process sizes, much of the total memory can typically go to RocksDB, unless the application logic requires a lot of JVM heap itself. The default managed memory fraction *(0.4)* is conservative and can often be increased when using TaskManagers with multi-GB process sizes.
 
-### Passing Options Factory to RocksDB
+  - The number of write buffers in RocksDB depends on the number of states you have in your application (states across all operators in the pipeline). Each state corresponds to one ColumnFamily, which needs its own write buffers. Hence, applications with many states typically need more memory for the same performance.
 
-There existed two ways to pass options factory to RocksDB in Flink:
+  - You can try and compare the performance of RocksDB with managed memory to RocksDB with per-column-family memory by setting `state.backend.rocksdb.memory.managed: false`. Especially to test against a baseline (assuming no- or gracious container memory limits) or to test for regressions compared to earlier versions of Flink, this can be useful.
+  
+    Compared to the managed memory setup (constant memory pool), not using managed memory means that RocksDB allocates memory proportional to the number of states in the application (memory footprint changes with application changes). As a rule of thumb, the non-managed mode has (unless ColumnFamily options are applied) an upper bound of roughly "140MB * num-states-across-all-tasks * num-slots". Timers count as state as well!
 
-  - Configure options factory through `flink-conf.yaml`. You could set the options factory class name via option key `state.backend.rocksdb.options-factory`.
-    The default value for this option is `org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory`, and all candidate configurable options are defined in `RocksDBConfigurableOptions`.
-    Moreover, you could also define your customized and configurable options factory class like below and pass the class name to `state.backend.rocksdb.options-factory`.
+  - If your application has many states and you see frequent MemTable flushes (write-side bottleneck), but you cannot give more memory you can increase the ratio of memory going to the write buffers (`state.backend.rocksdb.memory.write-buffer-ratio`). See [RocksDB Memory Management]({{ site.baseurl }}/ops/state/state_backends.html#memory-management) for details.
 
-    {% highlight java %}
+  - An advanced option (*expert mode*) to reduce the number of MemTable flushes in setups with many states, is to tune RocksDB's ColumnFamily options (arena block size, max background flush threads, etc.) via a `RocksDBOptionsFactory`:
 
+    {% highlight java %}
     public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory {
-
-        private static final long DEFAULT_SIZE = 256 * 1024 * 1024;  // 256 MB
-        private long blockCacheSize = DEFAULT_SIZE;
-
+    
         @Override
         public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
-            return currentOptions.setIncreaseParallelism(4)
-                   .setUseFsync(false);
+            // increase the max background flush threads when we have many states in one operator,
+            // which means we would have many column families in one DB instance.
+            return currentOptions.setMaxBackgroundFlushes(4);
         }
-
+    
         @Override
         public ColumnFamilyOptions createColumnOptions(
             ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
-            return currentOptions.setTableFormatConfig(
-                new BlockBasedTableConfig()
-                    .setBlockCacheSize(blockCacheSize)
-                    .setBlockSize(128 * 1024));            // 128 KB
+            // decrease the arena block size from default 8MB to 1MB. 
+            return currentOptions.setArenaBlockSize(1024 * 1024);
         }
-
+    
         @Override
         public OptionsFactory configure(Configuration configuration) {
-            this.blockCacheSize =
-                configuration.getLong("my.custom.rocksdb.block.cache.size", DEFAULT_SIZE);
             return this;
         }
     }
     {% endhighlight %}
 
-  - Set the options factory programmatically, e.g. `RocksDBStateBackend.setOptions(new MyOptionsFactory());`
-
-<span class="label label-info">Note</span> Options factory which set programmatically would override the one configured via `flink-conf.yaml`,
-and options factory has a higher priority over the predefined options if ever configured or set.
-
-<span class="label label-info">Note</span> RocksDB is a native library that allocates memory directly from the process,
-and not from the JVM. Any memory you assign to RocksDB will have to be accounted for, typically by decreasing the JVM heap size
-of the TaskManagers by the same amount. Not doing that may result in YARN/Mesos/etc terminating the JVM processes for
-allocating more memory than configured.
-
-### Bounding RocksDB Memory Usage
-
-RocksDB allocates native memory outside of the JVM, which could lead the process to exceed the total memory budget.
-This can be especially problematic in containerized environments such as Kubernetes that kill processes who exceed their memory budgets.
-
-Flink limit total memory usage of RocksDB instance(s) per slot by leveraging shareable [cache](https://github.com/facebook/rocksdb/wiki/Block-Cache)
-and [write buffer manager](https://github.com/facebook/rocksdb/wiki/Write-Buffer-Manager) among all instances in a single slot by default.
-The shared cache will place an upper limit on the [three components](https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB) that use the majority of memory
-when RocksDB is deployed as a state backend: block cache, index and bloom filters, and MemTables. 
-
-This feature is enabled by default along with managed memory. Flink will use the managed memory budget as the per-slot memory limit for RocksDB state backend(s).
-
-Flink also provides two parameters to tune the memory fraction of MemTable and index & filters along with the bounding RocksDB memory usage feature:
-  - `state.backend.rocksdb.memory.write-buffer-ratio`, by default `0.5`, which means 50% of the given memory would be used by write buffer manager.
-  - `state.backend.rocksdb.memory.high-prio-pool-ratio`, by default `0.1`, which means 10% of the given memory would be set as high priority for index and filters in shared block cache.
-  We strongly suggest not to set this to zero, to prevent index and filters from competing against data blocks for staying in cache and causing performance issues.
-  Moreover, the L0 level filter and index are pinned into the cache by default to mitigate performance problems,
-  more details please refer to the [RocksDB-documentation](https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-filter-and-compression-dictionary-blocks).
-
-<span class="label label-info">Note</span> When bounded RocksDB memory usage is enabled by default,
-the shared `cache` and `write buffer manager` will override customized settings of block cache and write buffer via `PredefinedOptions` and `OptionsFactory`.
-
-*Experts only*: To control memory manually instead of using managed memory, user can set `state.backend.rocksdb.memory.managed` as `false` and control via `ColumnFamilyOptions`.
-Or to save some manual calculation, through the `state.backend.rocksdb.memory.fixed-per-slot` option which will override `state.backend.rocksdb.memory.managed` when configured.
-With the later method, please tune down `taskmanager.memory.managed.size` or `taskmanager.memory.managed.fraction` to `0` 
-and increase `taskmanager.memory.task.off-heap.size` by "`taskmanager.numberOfTaskSlots` * `state.backend.rocksdb.memory.fixed-per-slot`" accordingly.
-
-#### Tune performance when bounding RocksDB memory usage.
-
-There might existed performance regression compared with previous no-memory-limit case if you have too many states per slot.
-- If you observed this behavior and not running jobs in containerized environment or don't care about the over-limit memory usage.
-The easiest way to wipe out the performance regression is to disable memory bound for RocksDB, e.g. turn `state.backend.rocksdb.memory.managed` as `false`.
-Moreover, please refer to [memory configuration migration guide](WIP) to know how to keep backward compatibility to previous memory configuration.
-- Otherwise you need to increase the upper memory for RocksDB by tuning up `taskmanager.memory.managed.size` or `taskmanager.memory.managed.fraction`, or increasing the total memory for task manager.
-
-*Experts only*: Apart from increasing total memory, user could also tune RocksDB options (e.g. arena block size, max background flush threads, etc.) via `RocksDBOptionsFactory`:
-
-{% highlight java %}
-public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory {
-
-    @Override
-    public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
-        // increase the max background flush threads when we have many states in one operator,
-        // which means we would have many column families in one DB instance.
-        return currentOptions.setMaxBackgroundFlushes(4);
-    }
-
-    @Override
-    public ColumnFamilyOptions createColumnOptions(
-        ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
-        // decrease the arena block size from default 8MB to 1MB. 
-        return currentOptions.setArenaBlockSize(1024 * 1024);
-    }
-
-    @Override
-    public OptionsFactory configure(Configuration configuration) {
-        return this;
-    }
-}
-{% endhighlight %}
-
 ## Capacity Planning
 
 This section discusses how to decide how many resources should be used for a Flink job to run reliably.
diff --git a/docs/ops/state/large_state_tuning.zh.md b/docs/ops/state/large_state_tuning.zh.md
index f3a7964..a78dc5a 100644
--- a/docs/ops/state/large_state_tuning.zh.md
+++ b/docs/ops/state/large_state_tuning.zh.md
@@ -49,7 +49,7 @@ The two numbers that are of particular interest when scaling up checkpoints are:
 
   - The time until operators start their checkpoint: This time is currently not exposed directly, but corresponds
     to:
-
+    
     `checkpoint_start_delay = end_to_end_duration - synchronous_duration - asynchronous_duration`
 
     When the time to trigger the checkpoint is constantly very high, it means that the *checkpoint barriers* need a long
@@ -118,160 +118,70 @@ Other state like keyed state is still snapshotted asynchronously. Please note th
 The state storage workhorse of many large scale Flink streaming applications is the *RocksDB State Backend*.
 The backend scales well beyond main memory and reliably stores large [keyed state](../../dev/stream/state/state.html).
 
-Unfortunately, RocksDB's performance can vary with configuration, and there is little documentation on how to tune
-RocksDB properly. For example, the default configuration is tailored towards SSDs and performs suboptimal
-on spinning disks.
+RocksDB's performance can vary with configuration, this section outlines some best-practices for tuning jobs that use the RocksDB State Backend.
 
 ### Incremental Checkpoints
 
-Incremental checkpoints can dramatically reduce the checkpointing time in comparison to full checkpoints, at the cost of a (potentially) longer
-recovery time. The core idea is that incremental checkpoints only record all changes to the previous completed checkpoint, instead of
-producing a full, self-contained backup of the state backend. Like this, incremental checkpoints build upon previous checkpoints. Flink leverages
-RocksDB's internal backup mechanism in a way that is self-consolidating over time. As a result, the incremental checkpoint history in Flink
-does not grow indefinitely, and old checkpoints are eventually subsumed and pruned automatically.
+When it comes to reducing the time that checkpoints take, activating incremental checkpoints should be one of the first considerations.
+Incremental checkpoints can dramatically reduce the checkpointing time in comparison to full checkpoints, because incremental checkpoints only record the changes compared to the previous completed checkpoint, instead of producing a full, self-contained backup of the state backend.
 
-While we strongly encourage the use of incremental checkpoints for large state, please note that this is a new feature and currently not enabled
-by default. To enable this feature, users can instantiate a `RocksDBStateBackend` with the corresponding boolean flag in the constructor set to `true`, e.g.:
+See [Incremental Checkpoints in RocksDB]({{ site.baseurl }}/ops/state/state_backends.html#incremental-checkpoints) for more background information.
 
-{% highlight java %}
-    RocksDBStateBackend backend =
-        new RocksDBStateBackend(filebackend, true);
-{% endhighlight %}
+### Timers in RocksDB or on JVM Heap
 
-### RocksDB Timers
+Timers are stored in RocksDB by default, which is the more robust and scalable choice.
 
-For RocksDB, a user can chose whether timers are stored on the heap or inside RocksDB (default). Heap-based timers can have a better performance for smaller numbers of
-timers, while storing timers inside RocksDB offers higher scalability as the number of timers in RocksDB can exceed the available main memory (spilling to disk).
+When performance-tuning jobs that have few timers only (no windows, not using timers in ProcessFunction), putting those timers on the heap can increase performance.
+Use this feature carefully, as heap-based timers may increase checkpointing times and naturally cannot scale beyond memory.
 
-When using RockDB as state backend, the type of timer storage can be selected through Flink's configuration via option key `state.backend.rocksdb.timer-service.factory`.
-Possible choices are `heap` (to store timers on the heap, default) and `rocksdb` (to store timers in RocksDB).
+See [this section]({{ site.baseurl }}/ops/state/state_backends.html#timers-heap-vs-rocksdb) for details on how to configure heap-based timers.
 
-<span class="label label-info">Note</span> *The combination RocksDB state backend with heap-based timers currently does NOT support asynchronous snapshots for the timers state.
-Other state like keyed state is still snapshotted asynchronously. Please note that this is not a regression from previous versions and will be resolved with `FLINK-10026`.*
+### Tuning RocksDB Memory
 
-### Predefined Options
+The performance of the RocksDB State Backend much depends on the amount of memory that it has available. To increase performance, adding memory can help a lot, or adjusting to which functions memory goes.
 
-Flink provides some predefined collections of option for RocksDB for different settings, and there existed two ways
-to pass these predefined options to RocksDB:
-  - Configure the predefined options through `flink-conf.yaml` via option key `state.backend.rocksdb.predefined-options`.
-    The default value of this option is `DEFAULT` which means `PredefinedOptions.DEFAULT`.
-  - Set the predefined options programmatically, e.g. `RocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM)`.
+By default, the RocksDB State Backend uses Flink's managed memory budget for RocksDBs buffers and caches (`state.backend.rocksdb.memory.managed: true`). Please refer to the [RocksDB Memory Management]({{ site.baseurl }}/ops/state/state_backends.html#memory-management) for background on how that mechanism works.
 
-We expect to accumulate more such profiles over time. Feel free to contribute such predefined option profiles when you
-found a set of options that work well and seem representative for certain workloads.
+To tune memory-related performance issues, the following steps may be helpful:
 
-<span class="label label-info">Note</span> Predefined options which set programmatically would override the one configured via `flink-conf.yaml`.
+  - The first step to try and increase performance should be to increase the amount of managed memory. This usually improves the situation a lot, without opening up the complexity of tuning low-level RocksDB options.
+    
+    Especially with large container/process sizes, much of the total memory can typically go to RocksDB, unless the application logic requires a lot of JVM heap itself. The default managed memory fraction *(0.4)* is conservative and can often be increased when using TaskManagers with multi-GB process sizes.
 
-### Passing Options Factory to RocksDB
+  - The number of write buffers in RocksDB depends on the number of states you have in your application (states across all operators in the pipeline). Each state corresponds to one ColumnFamily, which needs its own write buffers. Hence, applications with many states typically need more memory for the same performance.
 
-There existed two ways to pass options factory to RocksDB in Flink:
+  - You can try and compare the performance of RocksDB with managed memory to RocksDB with per-column-family memory by setting `state.backend.rocksdb.memory.managed: false`. Especially to test against a baseline (assuming no- or gracious container memory limits) or to test for regressions compared to earlier versions of Flink, this can be useful.
+  
+    Compared to the managed memory setup (constant memory pool), not using managed memory means that RocksDB allocates memory proportional to the number of states in the application (memory footprint changes with application changes). As a rule of thumb, the non-managed mode has (unless ColumnFamily options are applied) an upper bound of roughly "140MB * num-states-across-all-tasks * num-slots". Timers count as state as well!
 
-  - Configure options factory through `flink-conf.yaml`. You could set the options factory class name via option key `state.backend.rocksdb.options-factory`.
-    The default value for this option is `org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory`, and all candidate configurable options are defined in `RocksDBConfigurableOptions`.
-    Moreover, you could also define your customized and configurable options factory class like below and pass the class name to `state.backend.rocksdb.options-factory`.
+  - If your application has many states and you see frequent MemTable flushes (write-side bottleneck), but you cannot give more memory you can increase the ratio of memory going to the write buffers (`state.backend.rocksdb.memory.write-buffer-ratio`). See [RocksDB Memory Management]({{ site.baseurl }}/ops/state/state_backends.html#memory-management) for details.
 
-    {% highlight java %}
+  - An advanced option (*expert mode*) to reduce the number of MemTable flushes in setups with many states, is to tune RocksDB's ColumnFamily options (arena block size, max background flush threads, etc.) via a `RocksDBOptionsFactory`:
 
+    {% highlight java %}
     public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory {
-
-        private static final long DEFAULT_SIZE = 256 * 1024 * 1024;  // 256 MB
-        private long blockCacheSize = DEFAULT_SIZE;
-
+    
         @Override
         public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
-            return currentOptions.setIncreaseParallelism(4)
-                   .setUseFsync(false);
+            // increase the max background flush threads when we have many states in one operator,
+            // which means we would have many column families in one DB instance.
+            return currentOptions.setMaxBackgroundFlushes(4);
         }
-
+    
         @Override
         public ColumnFamilyOptions createColumnOptions(
             ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
-            return currentOptions.setTableFormatConfig(
-                new BlockBasedTableConfig()
-                    .setBlockCacheSize(blockCacheSize)
-                    .setBlockSize(128 * 1024));            // 128 KB
+            // decrease the arena block size from default 8MB to 1MB. 
+            return currentOptions.setArenaBlockSize(1024 * 1024);
         }
-
+    
         @Override
         public OptionsFactory configure(Configuration configuration) {
-            this.blockCacheSize =
-                configuration.getLong("my.custom.rocksdb.block.cache.size", DEFAULT_SIZE);
             return this;
         }
     }
     {% endhighlight %}
 
-  - Set the options factory programmatically, e.g. `RocksDBStateBackend.setOptions(new MyOptionsFactory());`
-
-<span class="label label-info">Note</span> Options factory which set programmatically would override the one configured via `flink-conf.yaml`,
-and options factory has a higher priority over the predefined options if ever configured or set.
-
-<span class="label label-info">Note</span> RocksDB is a native library that allocates memory directly from the process,
-and not from the JVM. Any memory you assign to RocksDB will have to be accounted for, typically by decreasing the JVM heap size
-of the TaskManagers by the same amount. Not doing that may result in YARN/Mesos/etc terminating the JVM processes for
-allocating more memory than configured.
-
-### Bounding RocksDB Memory Usage
-
-RocksDB allocates native memory outside of the JVM, which could lead the process to exceed the total memory budget.
-This can be especially problematic in containerized environments such as Kubernetes that kill processes who exceed their memory budgets.
-
-Flink limit total memory usage of RocksDB instance(s) per slot by leveraging shareable [cache](https://github.com/facebook/rocksdb/wiki/Block-Cache)
-and [write buffer manager](https://github.com/facebook/rocksdb/wiki/Write-Buffer-Manager) among all instances in a single slot by default.
-The shared cache will place an upper limit on the [three components](https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB) that use the majority of memory
-when RocksDB is deployed as a state backend: block cache, index and bloom filters, and MemTables. 
-
-This feature is enabled by default along with managed memory. Flink will use the managed memory budget as the per-slot memory limit for RocksDB state backend(s).
-
-Flink also provides two parameters to tune the memory fraction of MemTable and index & filters along with the bounding RocksDB memory usage feature:
-  - `state.backend.rocksdb.memory.write-buffer-ratio`, by default `0.5`, which means 50% of the given memory would be used by write buffer manager.
-  - `state.backend.rocksdb.memory.high-prio-pool-ratio`, by default `0.1`, which means 10% of the given memory would be set as high priority for index and filters in shared block cache.
-  We strongly suggest not to set this to zero, to prevent index and filters from competing against data blocks for staying in cache and causing performance issues.
-  Moreover, the L0 level filter and index are pinned into the cache by default to mitigate performance problems,
-  more details please refer to the [RocksDB-documentation](https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-filter-and-compression-dictionary-blocks).
-
-<span class="label label-info">Note</span> When bounded RocksDB memory usage is enabled by default,
-the shared `cache` and `write buffer manager` will override customized settings of block cache and write buffer via `PredefinedOptions` and `OptionsFactory`.
-
-*Experts only*: To control memory manually instead of using managed memory, user can set `state.backend.rocksdb.memory.managed` as `false` and control via `ColumnFamilyOptions`.
-Or to save some manual calculation, through the `state.backend.rocksdb.memory.fixed-per-slot` option which will override `state.backend.rocksdb.memory.managed` when configured.
-With the later method, please tune down `taskmanager.memory.managed.size` or `taskmanager.memory.managed.fraction` to `0` 
-and increase `taskmanager.memory.task.off-heap.size` by "`taskmanager.numberOfTaskSlots` * `state.backend.rocksdb.memory.fixed-per-slot`" accordingly.
-
-#### Tune performance when bounding RocksDB memory usage.
-
-There might existed performance regression compared with previous no-memory-limit case if you have too many states per slot.
-- If you observed this behavior and not running jobs in containerized environment or don't care about the over-limit memory usage.
-The easiest way to wipe out the performance regression is to disable memory bound for RocksDB, e.g. turn `state.backend.rocksdb.memory.managed` as `false`.
-Moreover, please refer to [memory configuration migration guide](WIP) to know how to keep backward compatibility to previous memory configuration.
-- Otherwise you need to increase the upper memory for RocksDB by tuning up `taskmanager.memory.managed.size` or `taskmanager.memory.managed.fraction`, or increasing the total memory for task manager.
-
-*Experts only*: Apart from increasing total memory, user could also tune RocksDB options (e.g. arena block size, max background flush threads, etc.) via `RocksDBOptionsFactory`:
-
-{% highlight java %}
-public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory {
-
-    @Override
-    public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
-        // increase the max background flush threads when we have many states in one operator,
-        // which means we would have many column families in one DB instance.
-        return currentOptions.setMaxBackgroundFlushes(4);
-    }
-
-    @Override
-    public ColumnFamilyOptions createColumnOptions(
-        ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
-        // decrease the arena block size from default 8MB to 1MB. 
-        return currentOptions.setArenaBlockSize(1024 * 1024);
-    }
-
-    @Override
-    public OptionsFactory configure(Configuration configuration) {
-        return this;
-    }
-}
-{% endhighlight %}
-
 ## Capacity Planning
 
 This section discusses how to decide how many resources should be used for a Flink job to run reliably.
@@ -292,7 +202,7 @@ The basic rules of thumb for capacity planning are:
   - Temporary back pressure is usually okay, and an essential part of execution flow control during load spikes,
     during catch-up phases, or when external systems (that are written to in a sink) exhibit temporary slowdown.
 
-  - Certain operations (like large windows) result in a spiky load for their downstream operators:
+  - Certain operations (like large windows) result in a spiky load for their downstream operators: 
     In the case of windows, the downstream operators may have little to do while the window is being built,
     and have a load to do when the windows are emitted.
     The planning for the downstream parallelism needs to take into account how much the windows emit and how
@@ -308,10 +218,10 @@ executing the program with a low parallelism.
 
 ## Compression
 
-Flink offers optional compression (default: off) for all checkpoints and savepoints. Currently, compression always uses
+Flink offers optional compression (default: off) for all checkpoints and savepoints. Currently, compression always uses 
 the [snappy compression algorithm (version 1.1.4)](https://github.com/xerial/snappy-java) but we are planning to support
 custom compression algorithms in the future. Compression works on the granularity of key-groups in keyed state, i.e.
-each key-group can be decompressed individually, which is important for rescaling.
+each key-group can be decompressed individually, which is important for rescaling. 
 
 Compression can be activated through the `ExecutionConfig`:
 
diff --git a/docs/ops/state/state_backends.md b/docs/ops/state/state_backends.md
index 1264aec..5bc0afb 100644
--- a/docs/ops/state/state_backends.md
+++ b/docs/ops/state/state_backends.md
@@ -37,7 +37,7 @@ chosen **State Backend**.
 * ToC
 {:toc}
 
-## Available State Backends
+# Available State Backends
 
 Out of the box, Flink bundles these state backends:
 
@@ -125,7 +125,7 @@ Certain RocksDB native metrics are available but disabled by default, you can fi
 
 The total memory amount of RocksDB instance(s) per slot can also be bounded, please refer to documentation [here](large_state_tuning.html#bounding-rocksdb-memory-usage) for details.
 
-## Configuring a State Backend
+# Configuring a State Backend
 
 The default state backend, if you specify nothing, is the jobmanager. If you wish to establish a different default for all jobs on your cluster, you can do so by defining a new default state backend in **flink-conf.yaml**. The default state backend can be overridden on a per-job basis, as shown below.
 
@@ -188,8 +188,145 @@ state.backend: filesystem
 state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
 {% endhighlight %}
 
-#### RocksDB State Backend Config Options
 
-{% include generated/rocks_db_configuration.html %}
+# RocksDB State Backend Details
+
+*This section describes the RocksDB state backend in more detail.*
+
+### Incremental Checkpoints
+
+RocksDB supports *Incremental Checkpoints*, which can dramatically reduce the checkpointing time in comparison to full checkpoints.
+Instead of producing a full, self-contained backup of the state backend, incremental checkpoints only record the changes that happened since the latest completed checkpoint.
+
+An incremental checkpoint builds upon (typically multiple) previous checkpoints. Flink leverages RocksDB's internal compaction mechanism in a way that is self-consolidating over time. As a result, the incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints are eventually subsumed and pruned automatically.
+
+Recovery time of incremental checkpoints may be longer or shorter compared to full checkpoints. If your network bandwidth is the bottleneck, it may take a bit longer to restore from an incremental checkpoint, because it implies fetching more data (more deltas). Restoring from an incremental checkpoint is faster, if the bottleneck is your CPU or IOPs, because restoring from an incremental checkpoint means not re-building the local RocksDB tables from Flink's canonical key/value snapshot f [...]
+
+While we encourage the use of incremental checkpoints for large state, you need to enable this feature manually:
+  - Setting a default in your `flink-conf.yaml`: `state.backend.incremental: true` will enable incremental checkpoints, unless the application overrides this setting in the code.
+  - You can alternatively configure this directly in the code (overrides the config default): `RocksDBStateBackend backend = new RocksDBStateBackend(checkpointDirURI, true);`
+
+### Memory Management
+
+Flink aims to control the total process memory consumption to make sure that the Flink TaskManagers have a well-behaved memory footprint. That means staying within the limits enforced by the environment (Docker/Kubernetes, Yarn, etc) to not get killed for consuming too much memory, but also to not under-utilize memory (unnecessary spilling to disk, wasted caching opportunities, reduced performance).
+
+To achieve that, Flink by default configures RocksDB's memory allocation to the amount of managed memory of the TaskManager (or, more precisely, task slot). This should give good out-of-the-box experience for most applications, meaning most applications should not need to tune any of the detailed RocksDB settings. The primary mechanism for improving memory-related performance issues would be to simply increase Flink's managed memory.
+
+Users can choose to deactivate that feature and let RocksDB allocate memory independently per ColumnFamily (one per state per operator). This offers expert users ultimately more fine grained control over RocksDB, but means that users need to take care themselves that the overall memory consumption does not exceed the limits of the environment. See [large state tuning]({{ site.baseurl }}/ops/state/large_state_tuning.html#tuning-rocksdb-memory) for some guideline about large state performa [...]
+
+**Managed Memory for RocksDB**
+
+This feature is active by default and can be (de)activated via the `state.backend.rocksdb.memory.managed` configuration key.
+
+Flink does not directly manage RocksDB's native memory allocations, but configures RocksDB in a certain way to ensure it uses exactly as much memory as Flink has for its managed memory budget. This is done on a per-slot level (managed memory is accounted per slot).
+
+To set the total memory usage of RocksDB instance(s), Flink leverages a shared [cache](https://github.com/facebook/rocksdb/wiki/Block-Cache)
+and [write buffer manager](https://github.com/facebook/rocksdb/wiki/Write-Buffer-Manager) among all instances in a single slot. 
+The shared cache will place an upper limit on the [three components](https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB) that use the majority of memory
+in RocksDB: block cache, index and bloom filters, and MemTables.
+
+For advanced tuning, Flink also provides two parameters to control the division of memory between the *write path* (MemTable) and *read path* (index & filters, remaining cache). When you see that RocksDB performs badly due to lack of write buffer memory (frequent flushes) or cache misses, you can use these parameters to redistribute the memory.
+
+  - `state.backend.rocksdb.memory.write-buffer-ratio`, by default `0.5`, which means 50% of the given memory would be used by write buffer manager.
+  - `state.backend.rocksdb.memory.high-prio-pool-ratio`, by default `0.1`, which means 10% of the given memory would be set as high priority for index and filters in shared block cache.
+  We strongly suggest not to set this to zero, to prevent index and filters from competing against data blocks for staying in cache and causing performance issues.
+  Moreover, the L0 level filter and index are pinned into the cache by default to mitigate performance problems,
+  more details please refer to the [RocksDB-documentation](https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-filter-and-compression-dictionary-blocks).
+
+<span class="label label-info">Note</span> When the above described mechanism (`cache` and `write buffer manager`) is enabled, it will override any customized settings for block caches and write buffers done via [`PredefinedOptions`](#predefined-per-columnfamily-options) and [`OptionsFactory`](#passing-options-factory-to-rocksdb).
+
+<span class="label label-info">Note</span> *Expert Mode*: To control memory manually, you can set `state.backend.rocksdb.memory.managed` to `false` and configure RocksDB via [`ColumnFamilyOptions`](#passing-options-factory-to-rocksdb). Alternatively, you can use the above mentioned cache/buffer-manager mechanism, but set the memory size to a fixed amount independent of Flink's managed memory size (`state.backend.rocksdb.memory.fixed-per-slot` option). Note that in both cases, users need  [...]
+
+### Timers (Heap vs. RocksDB)
+
+Timers are used to schedule actions for later (event-time or processing-time), such as firing a window, or calling back a `ProcessFunction`.
+
+When selecting the RocksDB State Backend, timers are by default also stored in RocksDB. That is a robust and scalable way that lets applications scale to many timers. However, maintaining timers in RocksDB can have a certain cost, which is why Flink provides the option to store timers on the JVM heap instead, even when RocksDB is used to store other states. Heap-based timers can have a better performance when there is a smaller number of timers.
+
+Set the configuration option `state.backend.rocksdb.timer-service.factory` to `heap` (rather than the default, `rocksdb`) to store timers on heap.
+
+<span class="label label-info">Note</span> *The combination RocksDB state backend with heap-based timers currently does NOT support asynchronous snapshots for the timers state. Other state like keyed state is still snapshotted asynchronously.*
+
+### Enabling RocksDB Native Metrics
+
+You can optionally access RockDB's native metrics through Flink's metrics system, by enabling certain metrics selectively.
+See [configuration docs]({{ site.baseurl }}/ops/config.html#rocksdb-native-metrics) for details.
+
+<div class="alert alert-warning">
+  <strong>Note:</strong> Enabling RocksDB's native metrics may have a negative performance impact on your application.
+</div>
+
+### Predefined Per-ColumnFamily Options
+
+<span class="label label-info">Note</span> With the introduction of [memory management for RocksDB](#memory-management) this mechanism should be mainly used for *expert tuning* or *trouble shooting*.
+
+With *Predefined Options*, users can apply some predefined config profiles on each RocksDB Column Family, configuring for example memory use, thread, compaction settings, etc. There is currently one Column Family per each state in each operator.
+
+There are two ways to select predefined options to be applied:
+  - Set the option's name in `flink-conf.yaml` via `state.backend.rocksdb.predefined-options`.
+  - Set the predefined options programmatically: `RocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM)`.
+
+The default value for this option is `DEFAULT` which translates to `PredefinedOptions.DEFAULT`.
+
+<span class="label label-info">Note</span> Predefined options set programmatically would override the ones configured via `flink-conf.yaml`.
+
+### Passing Options Factory to RocksDB
+
+<span class="label label-info">Note</span> With the introduction of [memory management for RocksDB](#memory-management) this mechanism should be mainly used for *expert tuning* or *trouble shooting*.
+
+To manually control RocksDB's options, you need to configure an `RocksDBOptionsFactory`. This mechanism gives you fine-grained control over the settings of the Column Families, for example memory use, thread, compaction settings, etc. There is currently one Column Family per each state in each operator.
+
+There are two ways to pass an OptionsFactory to the RocksDB State Backend:
+
+  - Configure options factory class name in the `flink-conf.yaml` via `state.backend.rocksdb.options-factory`.
+  
+  - Set the options factory programmatically, e.g. `RocksDBStateBackend.setOptions(new MyOptionsFactory());`
+
+<span class="label label-info">Note</span> Options factory which set programmatically would override the one configured via `flink-conf.yaml`,
+and options factory has a higher priority over the predefined options if ever configured or set.
+
+<span class="label label-info">Note</span> RocksDB is a native library that allocates memory directly from the process,
+and not from the JVM. Any memory you assign to RocksDB will have to be accounted for, typically by decreasing the JVM heap size
+of the TaskManagers by the same amount. Not doing that may result in YARN/Mesos/etc terminating the JVM processes for
+allocating more memory than configured.
+
+**Reading Column Family Options from flink-conf.yaml**
+
+When an `OptionsFactory` implements the `ConfigurableRocksDBOptionsFactory` interface, it can directly read settings from the configuration (`flink-conf.yaml`).
+
+The default value for `state.backend.rocksdb.options-factory` is in fact `org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory` which picks up all config options [defined here]({{ site.baseurl }}/ops/config.html#advanced-rocksdb-state-backends-options) by default. Hence, you can configure low-level Column Family options simply by turning off managed memory for RocksDB and putting the relevant entries in the configuration.
+
+Below is an example how to define a custom ConfigurableOptionsFactory (set class name under `state.backend.rocksdb.options-factory`).
+
+    {% highlight java %}
+
+    public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory {
+
+        private static final long DEFAULT_SIZE = 256 * 1024 * 1024;  // 256 MB
+        private long blockCacheSize = DEFAULT_SIZE;
+
+        @Override
+        public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
+            return currentOptions.setIncreaseParallelism(4)
+                   .setUseFsync(false);
+        }
+
+        @Override
+        public ColumnFamilyOptions createColumnOptions(
+            ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
+            return currentOptions.setTableFormatConfig(
+                new BlockBasedTableConfig()
+                    .setBlockCacheSize(blockCacheSize)
+                    .setBlockSize(128 * 1024));            // 128 KB
+        }
+
+        @Override
+        public OptionsFactory configure(Configuration configuration) {
+            this.blockCacheSize =
+                configuration.getLong("my.custom.rocksdb.block.cache.size", DEFAULT_SIZE);
+            return this;
+        }
+    }
+    {% endhighlight %}
 
 {% top %}