You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2022/01/28 20:25:43 UTC

[flink] branch master updated: [FLINK-25024][docs] Add Changelog backend docs

This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f74a28b  [FLINK-25024][docs] Add Changelog backend docs
f74a28b is described below

commit f74a28b81447955c4d696c5d11494137adc12aaa
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Thu Jan 20 19:26:04 2022 +0100

    [FLINK-25024][docs] Add Changelog backend docs
---
 docs/content.zh/docs/deployment/config.md          |  11 ++
 docs/content.zh/docs/ops/metrics.md                |  53 +++++++++
 docs/content.zh/docs/ops/state/state_backends.md   | 125 +++++++++++++++++++++
 docs/content/docs/deployment/config.md             |  11 ++
 docs/content/docs/ops/metrics.md                   |  53 +++++++++
 docs/content/docs/ops/state/state_backends.md      | 123 ++++++++++++++++++++
 .../fs_state_changelog_configuration.html          |  84 ++++++++++++++
 flink-docs/pom.xml                                 |   8 +-
 .../configuration/ConfigOptionsDocGenerator.java   |   4 +-
 9 files changed, 470 insertions(+), 2 deletions(-)

diff --git a/docs/content.zh/docs/deployment/config.md b/docs/content.zh/docs/deployment/config.md
index aa80c3e..a96ea32 100644
--- a/docs/content.zh/docs/deployment/config.md
+++ b/docs/content.zh/docs/deployment/config.md
@@ -339,6 +339,17 @@ Advanced options to tune RocksDB and RocksDB checkpoints.
 
 {{< generated/expert_rocksdb_section >}}
 
+### State Changelog Options
+
+Please refer to [State Backends]({{< ref "docs/ops/state/state_backends#enabling-changelog" >}}) for information on
+using State Changelog. {{< hint warning >}} The feature is in experimental status. {{< /hint >}} {{<
+generated/state_backend_changelog_section >}}
+
+#### FileSystem-based Changelog options
+
+These settings take effect when the `state.backend.changelog.storage`  is set to `filesystem` (see [above](#state-backend-changelog-storage)).
+{{< generated/fs_state_changelog_configuration >}}
+
 **RocksDB Configurable Options**
 
 These options give fine-grained control over the behavior and resources of ColumnFamilies.
diff --git a/docs/content.zh/docs/ops/metrics.md b/docs/content.zh/docs/ops/metrics.md
index c051239..c01ad7f 100644
--- a/docs/content.zh/docs/ops/metrics.md
+++ b/docs/content.zh/docs/ops/metrics.md
@@ -1217,6 +1217,59 @@ Note that for failed checkpoints, metrics are updated on a best efforts basis an
 ### RocksDB
 Certain RocksDB native metrics are available but disabled by default, you can find full documentation [here]({{< ref "docs/deployment/config" >}}#rocksdb-native-metrics)
 
+### State Changelog
+
+Note that the metrics are only available via reporters.
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 18%">Scope</th>
+      <th class="text-left" style="width: 26%">Metrics</th>
+      <th class="text-left" style="width: 48%">Description</th>
+      <th class="text-left" style="width: 8%">Type</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <th rowspan="20"><strong>Job (only available on TaskManager)</strong></th>
+      <td>numberOfUploadRequests</td>
+      <td>Total number of upload requests made</td>
+      <td>Counter</td>
+    </tr>
+    <tr>
+      <td>numberOfUploadFailures</td>
+      <td>Total number of failed upload requests (request may be retried after the failure)</td>
+      <td>Counter</td>
+    </tr>
+    <tr>
+      <td>attemptsPerUpload</td>
+      <td>The number of attempts per upload</td>
+      <td>Histogram</td>
+    </tr>
+    <tr>
+      <td>uploadBatchSizes</td>
+      <td>The number of upload tasks (coming from one or more writers, i.e. backends/tasks) that were grouped together and form a single upload resulting in a single file</td>
+      <td>Histogram</td>
+    </tr>
+    <tr>
+      <td>uploadLatenciesNanos</td>
+      <td>The latency distributions of uploads</td>
+      <td>Histogram</td>
+    </tr>
+    <tr>
+      <td>uploadSizes</td>
+      <td>The size distributions of uploads</td>
+      <td>Histogram</td>
+    </tr>
+    <tr>
+      <td>uploadQueueSize</td>
+      <td>Current size of upload queue. Queue items can be packed together and form a single upload.</td>
+      <td>Meter</td>
+    </tr>
+  </tbody>
+</table>
+
 ### IO
 <table class="table table-bordered">
   <thead>
diff --git a/docs/content.zh/docs/ops/state/state_backends.md b/docs/content.zh/docs/ops/state/state_backends.md
index 0cf29c8..38575f6 100644
--- a/docs/content.zh/docs/ops/state/state_backends.md
+++ b/docs/content.zh/docs/ops/state/state_backends.md
@@ -303,6 +303,131 @@ public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory {
 
 {{< top >}}
 
+## Enabling Changelog
+
+{{< hint warning >}} This feature is in experimental status. {{< /hint >}}
+
+{{< hint warning >}} Enabling Changelog may have a negative performance impact on your application (see below). {{< /hint >}}
+
+### Introduction
+
+Changelog is a feature that aims to decrease checkpointing time and, therefore, end-to-end latency in exactly-once mode.
+
+Most commonly, checkpoint duration is affected by:
+
+1. Barrier travel time and alignment, addressed by
+   [Unaligned checkpoints]({{< ref "docs/ops/state/checkpointing_under_backpressure#unaligned-checkpoints" >}})
+   and [Buffer debloating]({{< ref "docs/ops/state/checkpointing_under_backpressure#buffer-debloating" >}})
+2. Snapshot creation time (so-called synchronous phase), addressed by asynchronous snapshots (mentioned [above]({{<
+   ref "#the-embeddedrocksdbstatebackend">}}))
+4. Snapshot upload time (asynchronous phase)
+
+Upload time can be decreased by [incremental checkpoints]({{< ref "#incremental-checkpoints" >}}).
+However, most incremental state backends perform some form of compaction periodically, which results in re-uploading the
+old state in addition to the new changes. In large deployments, the probability of at least one task uploading lots of
+data tends to be very high in every checkpoint.
+
+With Changelog enabled, Flink uploads state changes continuously and forms a changelog. On checkpoint, only the relevant
+part of this changelog needs to be uploaded. The configured state backend is snapshotted in the
+background periodically. Upon successful upload, the changelog is truncated.
+
+As a result, asynchronous phase duration is reduced, as well as synchronous phase - because no data needs to be flushed
+to disk. In particular, long-tail latency is improved.
+
+However, resource usage is higher:
+
+- more files are created on DFS
+- more files can be left undeleted DFS (this will be addressed in the future versions in FLINK-25511 and FLINK-25512)
+- more IO bandwidth is used to upload state changes
+- more CPU used to serialize state changes
+- more memory used by Task Managers to buffer state changes
+
+Recovery time is another thing to consider. Depending on the `state.backend.changelog.periodic-materialize.interval`
+setting, the changelog can become lengthy and replaying it may take more time. However, recovery time combined with
+checkpoint duration will likely still be lower than in non-changelog setups, providing lower end-to-end latency even in
+failover case. However, it's also possible that the effective recovery time will increase, depending on the actual ratio
+of the aforementioned times.
+
+For more details, see [FLIP-158](https://cwiki.apache.org/confluence/display/FLINK/FLIP-158%3A+Generalized+incremental+checkpoints).
+
+### Installation
+
+Changelog JARs are included into the standard Flink distribution.
+
+Make sure to [add]({{< ref "docs/deployment/filesystems/overview" >}}) the necessary filesystem plugins.
+
+### Configuration
+
+Here is an example configuration in YAML:
+```yaml
+state.backend.changelog.enabled: true
+state.backend.changelog.storage: filesystem # currently, only filesystem and memory (for tests) are supported
+dstl.dfs.base-path: s3://<bucket-name> # similar to state.checkpoints.dir
+```
+
+Please keep the following defaults (see [limitations](#limitations)):
+```yaml
+execution.checkpointing.max-concurrent-checkpoints: 1
+state.backend.local-recovery: false
+```
+
+Please refer to the [configuration section]({{< ref "docs/deployment/config#state-changelog-options" >}}) for other options.
+
+Changelog can also be enabled or disabled per job programmatically:
+{{< tabs  >}}
+{{< tab "Java" >}}
+```java
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableChangelogStateBackend(true);
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableChangelogStateBackend(true)
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+env.enable_changelog_statebackend(true)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### Monitoring
+
+Available metrics are listed [here]({{< ref "docs/ops/metrics#changelog" >}}).
+
+If a task is backpressured by writing state changes, it will be shown as busy (red) in the UI.
+
+### Upgrading existing jobs
+
+**Enabling Changelog**
+
+Resuming from both savepoints and checkpoints is supported:
+- given an existing non-changelog job
+- take either a [savepoint]({{< ref "docs/ops/state/savepoints#resuming-from-savepoints" >}}) or a [checkpoint]({{< ref "docs/ops/state/checkpoints#resuming-from-a-retained-checkpoint" >}})
+- alter configuration (enable Changelog)
+- resume from the taken snapshot
+
+**Disabling Changelog**
+
+Resuming only from [savepoints]({{< ref "docs/ops/state/savepoints#resuming-from-savepoints" >}})
+is supported. Resuming from [checkpoints]({{<  ref "docs/ops/state/checkpoints#resuming-from-a-retained-checkpoint" >}})
+is planned in the future versions.
+
+**State migration** (including changing TTL) is currently not supported
+
+### Limitations
+ - At most one concurrent checkpoint
+ - Local recovery not supported
+ - As of Flink 1.15, only `filesystem` changelog implementation is available
+ - State migration (including changing TTL) is currently not supported
+- [NO_CLAIM]({{< ref "docs/deployment/config#execution-savepoint-restore-mode" >}}) mode not supported
+
+{{< top >}}
+
 ## 自旧版本迁移
 
 从 **Flink 1.13** 版本开始,社区改进了 state backend 的公开类,进而帮助用户更好理解本地状态存储和 checkpoint 存储的区分。
diff --git a/docs/content/docs/deployment/config.md b/docs/content/docs/deployment/config.md
index d6bd80c..39e01f7 100644
--- a/docs/content/docs/deployment/config.md
+++ b/docs/content/docs/deployment/config.md
@@ -341,6 +341,17 @@ Advanced options to tune RocksDB and RocksDB checkpoints.
 
 {{< generated/expert_rocksdb_section >}}
 
+### State Changelog Options
+
+Please refer to [State Backends]({{< ref "docs/ops/state/state_backends#enabling-changelog" >}}) for information on
+using State Changelog. {{< hint warning >}} The feature is in experimental status. {{< /hint >}} {{<
+generated/state_backend_changelog_section >}}
+
+#### FileSystem-based Changelog options
+
+These settings take effect when the `state.backend.changelog.storage`  is set to `filesystem` (see [above](#state-backend-changelog-storage)).
+{{< generated/fs_state_changelog_configuration >}}
+
 **RocksDB Configurable Options**
 
 These options give fine-grained control over the behavior and resources of ColumnFamilies.
diff --git a/docs/content/docs/ops/metrics.md b/docs/content/docs/ops/metrics.md
index c28ec45..237bc4a 100644
--- a/docs/content/docs/ops/metrics.md
+++ b/docs/content/docs/ops/metrics.md
@@ -1210,6 +1210,59 @@ Note that for failed checkpoints, metrics are updated on a best efforts basis an
 ### RocksDB
 Certain RocksDB native metrics are available but disabled by default, you can find full documentation [here]({{< ref "docs/deployment/config" >}}#rocksdb-native-metrics)
 
+### State Changelog
+
+Note that the metrics are only available via reporters.
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 18%">Scope</th>
+      <th class="text-left" style="width: 26%">Metrics</th>
+      <th class="text-left" style="width: 48%">Description</th>
+      <th class="text-left" style="width: 8%">Type</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <th rowspan="20"><strong>Job (only available on TaskManager)</strong></th>
+      <td>numberOfUploadRequests</td>
+      <td>Total number of upload requests made</td>
+      <td>Counter</td>
+    </tr>
+    <tr>
+      <td>numberOfUploadFailures</td>
+      <td>Total number of failed upload requests (request may be retried after the failure)</td>
+      <td>Counter</td>
+    </tr>
+    <tr>
+      <td>attemptsPerUpload</td>
+      <td>The number of attempts per upload</td>
+      <td>Histogram</td>
+    </tr>
+    <tr>
+      <td>uploadBatchSizes</td>
+      <td>The number of upload tasks (coming from one or more writers, i.e. backends/tasks) that were grouped together and form a single upload resulting in a single file</td>
+      <td>Histogram</td>
+    </tr>
+    <tr>
+      <td>uploadLatenciesNanos</td>
+      <td>The latency distributions of uploads</td>
+      <td>Histogram</td>
+    </tr>
+    <tr>
+      <td>uploadSizes</td>
+      <td>The size distributions of uploads</td>
+      <td>Histogram</td>
+    </tr>
+    <tr>
+      <td>uploadQueueSize</td>
+      <td>Current size of upload queue. Queue items can be packed together and form a single upload.</td>
+      <td>Meter</td>
+    </tr>
+  </tbody>
+</table>
+
 ### IO
 <table class="table table-bordered">
   <thead>
diff --git a/docs/content/docs/ops/state/state_backends.md b/docs/content/docs/ops/state/state_backends.md
index 6dd07af..8690e3d 100644
--- a/docs/content/docs/ops/state/state_backends.md
+++ b/docs/content/docs/ops/state/state_backends.md
@@ -325,6 +325,129 @@ public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory {
 
 {{< top >}}
 
+## Enabling Changelog
+
+{{< hint warning >}} This feature is in experimental status. {{< /hint >}}
+
+{{< hint warning >}} Enabling Changelog may have a negative performance impact on your application (see below). {{< /hint >}}
+
+### Introduction
+
+Changelog is a feature that aims to decrease checkpointing time and, therefore, end-to-end latency in exactly-once mode.
+
+Most commonly, checkpoint duration is affected by:
+
+1. Barrier travel time and alignment, addressed by
+   [Unaligned checkpoints]({{< ref "docs/ops/state/checkpointing_under_backpressure#unaligned-checkpoints" >}})
+   and [Buffer debloating]({{< ref "docs/ops/state/checkpointing_under_backpressure#buffer-debloating" >}})
+2. Snapshot creation time (so-called synchronous phase), addressed by asynchronous snapshots (mentioned [above]({{<
+   ref "#the-embeddedrocksdbstatebackend">}}))
+4. Snapshot upload time (asynchronous phase)
+
+Upload time can be decreased by [incremental checkpoints]({{< ref "#incremental-checkpoints" >}}).
+However, most incremental state backends perform some form of compaction periodically, which results in re-uploading the
+old state in addition to the new changes. In large deployments, the probability of at least one task uploading lots of
+data tends to be very high in every checkpoint.
+
+With Changelog enabled, Flink uploads state changes continuously and forms a changelog. On checkpoint, only the relevant
+part of this changelog needs to be uploaded. The configured state backend is snapshotted in the
+background periodically. Upon successful upload, the changelog is truncated.
+
+As a result, asynchronous phase duration is reduced, as well as synchronous phase - because no data needs to be flushed
+to disk. In particular, long-tail latency is improved.
+
+However, resource usage is higher:
+
+- more files are created on DFS
+- more files can be left undeleted DFS (this will be addressed in the future versions in FLINK-25511 and FLINK-25512)
+- more IO bandwidth is used to upload state changes
+- more CPU used to serialize state changes
+- more memory used by Task Managers to buffer state changes
+
+Recovery time is another thing to consider. Depending on the `state.backend.changelog.periodic-materialize.interval`
+setting, the changelog can become lengthy and replaying it may take more time. However, recovery time combined with
+checkpoint duration will likely still be lower than in non-changelog setups, providing lower end-to-end latency even in
+failover case. However, it's also possible that the effective recovery time will increase, depending on the actual ratio
+of the aforementioned times.
+
+For more details, see [FLIP-158](https://cwiki.apache.org/confluence/display/FLINK/FLIP-158%3A+Generalized+incremental+checkpoints).
+
+### Installation
+
+Changelog JARs are included into the standard Flink distribution.
+
+Make sure to [add]({{< ref "docs/deployment/filesystems/overview" >}}) the necessary filesystem plugins.
+
+### Configuration
+
+Here is an example configuration in YAML:
+```yaml
+state.backend.changelog.enabled: true
+state.backend.changelog.storage: filesystem # currently, only filesystem and memory (for tests) are supported
+dstl.dfs.base-path: s3://<bucket-name> # similar to state.checkpoints.dir
+```
+
+Please keep the following defaults (see [limitations](#limitations)):
+```yaml
+execution.checkpointing.max-concurrent-checkpoints: 1
+state.backend.local-recovery: false
+```
+
+Please refer to the [configuration section]({{< ref "docs/deployment/config#state-changelog-options" >}}) for other options.
+
+Changelog can also be enabled or disabled per job programmatically:
+{{< tabs  >}}
+{{< tab "Java" >}}
+```java
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableChangelogStateBackend(true);
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableChangelogStateBackend(true)
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+env.enable_changelog_statebackend(true)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### Monitoring
+
+Available metrics are listed [here]({{< ref "docs/ops/metrics#changelog" >}}).
+
+If a task is backpressured by writing state changes, it will be shown as busy (red) in the UI.
+
+### Upgrading existing jobs
+
+**Enabling Changelog**
+
+Resuming from both savepoints and checkpoints is supported:
+- given an existing non-changelog job
+- take either a [savepoint]({{< ref "docs/ops/state/savepoints#resuming-from-savepoints" >}}) or a [checkpoint]({{< ref "docs/ops/state/checkpoints#resuming-from-a-retained-checkpoint" >}})
+- alter configuration (enable Changelog)
+- resume from the taken snapshot
+
+**Disabling Changelog**
+
+Resuming only from [savepoints]({{< ref "docs/ops/state/savepoints#resuming-from-savepoints" >}})
+is supported. Resuming from [checkpoints]({{<  ref "docs/ops/state/checkpoints#resuming-from-a-retained-checkpoint" >}})
+is planned in the future versions.
+
+**State migration** (including changing TTL) is currently not supported
+
+### Limitations
+ - At most one concurrent checkpoint
+ - Local recovery not supported
+ - As of Flink 1.15, only `filesystem` changelog implementation is available
+ - State migration (including changing TTL) is currently not supported
+- [NO_CLAIM]({{< ref "docs/deployment/config#execution-savepoint-restore-mode" >}}) mode not supported
+
 ## Migrating from Legacy Backends
 
 Beginning in **Flink 1.13**, the community reworked its public state backend classes to help users better understand the separation of local state storage and checkpoint storage.
diff --git a/docs/layouts/shortcodes/generated/fs_state_changelog_configuration.html b/docs/layouts/shortcodes/generated/fs_state_changelog_configuration.html
new file mode 100644
index 0000000..45e33c4
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/fs_state_changelog_configuration.html
@@ -0,0 +1,84 @@
+<table class="configuration table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default</th>
+            <th class="text-left" style="width: 10%">Type</th>
+            <th class="text-left" style="width: 55%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>dstl.dfs.base-path</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Base path to store changelog files.</td>
+        </tr>
+        <tr>
+            <td><h5>dstl.dfs.batch.persist-delay</h5></td>
+            <td style="word-wrap: break-word;">10 ms</td>
+            <td>Duration</td>
+            <td>Delay before persisting changelog after receiving persist request (on checkpoint). Minimizes the number of files and requests if multiple operators (backends) or sub-tasks are using the same store. Correspondingly increases checkpoint time (async phase).</td>
+        </tr>
+        <tr>
+            <td><h5>dstl.dfs.batch.persist-size-threshold</h5></td>
+            <td style="word-wrap: break-word;">10 mb</td>
+            <td>MemorySize</td>
+            <td>Size threshold for state changes that were requested to be persisted but are waiting for dstl.dfs.batch.persist-delay (from all operators). . Once reached, accumulated changes are persisted immediately. This is different from dstl.dfs.preemptive-persist-threshold as it happens AFTER the checkpoint and potentially for state changes of multiple operators. Must not exceed in-flight data limit (see below)</td>
+        </tr>
+        <tr>
+            <td><h5>dstl.dfs.compression.enabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to enable compression when serializing changelog.</td>
+        </tr>
+        <tr>
+            <td><h5>dstl.dfs.preemptive-persist-threshold</h5></td>
+            <td style="word-wrap: break-word;">5 mb</td>
+            <td>MemorySize</td>
+            <td>Size threshold for state changes of a single operator beyond which they are persisted pre-emptively without waiting for a checkpoint.  Improves checkpointing time by allowing quasi-continuous uploading of state changes (as opposed to uploading all accumulated changes on checkpoint).</td>
+        </tr>
+        <tr>
+            <td><h5>dstl.dfs.upload.buffer-size</h5></td>
+            <td style="word-wrap: break-word;">1 mb</td>
+            <td>MemorySize</td>
+            <td>Buffer size used when uploading change sets</td>
+        </tr>
+        <tr>
+            <td><h5>dstl.dfs.upload.max-attempts</h5></td>
+            <td style="word-wrap: break-word;">3</td>
+            <td>Integer</td>
+            <td>Maximum number of attempts (including the initial one) to perform a particular upload. Only takes effect if dstl.dfs.upload.retry-policy is fixed.</td>
+        </tr>
+        <tr>
+            <td><h5>dstl.dfs.upload.max-in-flight</h5></td>
+            <td style="word-wrap: break-word;">100 mb</td>
+            <td>MemorySize</td>
+            <td>Max amount of data allowed to be in-flight. Upon reaching this limit the task will be back-pressured.  I.e., snapshotting will block; normal processing will block if dstl.dfs.preemptive-persist-threshold is set and reached. The limit is applied to the total size of in-flight changes if multiple operators/backends are using the same changelog storage. Must be greater than or equal to dstl.dfs.batch.persist-size-threshold</td>
+        </tr>
+        <tr>
+            <td><h5>dstl.dfs.upload.next-attempt-delay</h5></td>
+            <td style="word-wrap: break-word;">500 ms</td>
+            <td>Duration</td>
+            <td>Delay before the next attempt (if the failure was not caused by a timeout).</td>
+        </tr>
+        <tr>
+            <td><h5>dstl.dfs.upload.num-threads</h5></td>
+            <td style="word-wrap: break-word;">5</td>
+            <td>Integer</td>
+            <td>Number of threads to use for upload.</td>
+        </tr>
+        <tr>
+            <td><h5>dstl.dfs.upload.retry-policy</h5></td>
+            <td style="word-wrap: break-word;">"fixed"</td>
+            <td>String</td>
+            <td>Retry policy for the failed uploads (in particular, timed out). Valid values: none, fixed.</td>
+        </tr>
+        <tr>
+            <td><h5>dstl.dfs.upload.timeout</h5></td>
+            <td style="word-wrap: break-word;">1 s</td>
+            <td>Duration</td>
+            <td>Time threshold beyond which an upload is considered timed out. If a new attempt is made but this upload succeeds earlier then this upload result will be used. May improve upload times if tail latencies of upload requests are significantly high. Only takes effect if dstl.dfs.upload.retry-policy is fixed. Please note that timeout * max_attempts should be less than execution.checkpointing.timeout</td>
+        </tr>
+    </tbody>
+</table>
diff --git a/flink-docs/pom.xml b/flink-docs/pom.xml
index 2fe39c9..c9450f3 100644
--- a/flink-docs/pom.xml
+++ b/flink-docs/pom.xml
@@ -168,7 +168,13 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
-		<dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-dstl-dfs</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
 			<!-- Used for parsing HTML -->
 			<groupId>org.jsoup</groupId>
 			<artifactId>jsoup</artifactId>
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
index 01716aa..ffc2193 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
@@ -103,7 +103,9 @@ public class ConfigOptionsDocGenerator {
                         "flink-connectors/flink-connector-pulsar",
                         "org.apache.flink.connector.pulsar.source"),
                 new OptionsClassLocation(
-                        "flink-libraries/flink-cep", "org.apache.flink.cep.configuration")
+                        "flink-libraries/flink-cep", "org.apache.flink.cep.configuration"),
+                new OptionsClassLocation(
+                        "flink-dstl/flink-dstl-dfs", "org.apache.flink.changelog.fs"),
             };
 
     static final Set<String> EXCLUSIONS =