You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/07/25 16:32:54 UTC

[flink] branch release-1.9 updated: [FLINK-13222][docs] Add documentation for failover strategy option

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new e5e0d82  [FLINK-13222][docs] Add documentation for failover strategy option
e5e0d82 is described below

commit e5e0d822cec984a9115e1b1c4191a50ffd87e86e
Author: zhuzhu.zz <zh...@alibaba-inc.com>
AuthorDate: Mon Jul 15 18:54:20 2019 +0800

    [FLINK-13222][docs] Add documentation for failover strategy option
    
    Add detailed failover strategies documentation to doc page
    
    Point links in ZH docs to ZH pages
    
    Use a relative path in docs link to flink pages
    
    This closes #9113.
---
 .../generated/job_manager_configuration.html       |   5 +
 docs/dev/restart_strategies.zh.md                  | 270 ------------------
 ...tart_strategies.md => task_failure_recovery.md} |  66 ++++-
 docs/dev/task_failure_recovery.zh.md               | 314 +++++++++++++++++++++
 docs/redirects/restart_strategies.md               |  24 ++
 .../flink/configuration/JobManagerOptions.java     |  20 +-
 6 files changed, 418 insertions(+), 281 deletions(-)

diff --git a/docs/_includes/generated/job_manager_configuration.html b/docs/_includes/generated/job_manager_configuration.html
index 73477fe..b4ae08e 100644
--- a/docs/_includes/generated/job_manager_configuration.html
+++ b/docs/_includes/generated/job_manager_configuration.html
@@ -18,6 +18,11 @@
             <td>The maximum number of prior execution attempts kept in history.</td>
         </tr>
         <tr>
+            <td><h5>jobmanager.execution.failover-strategy</h5></td>
+            <td style="word-wrap: break-word;">"full"</td>
+            <td>This option specifies how the job computation recovers from task failures. Accepted values are:<ul><li>'full': Restarts all tasks to recover the job.</li><li>'region': Restarts all tasks that could be affected by the task failure. More details can be found <a href="../dev/task_failure_recovery.html#restart-pipelined-region-failover-strategy">here</a>.</li></ul></td>
+        </tr>
+        <tr>
             <td><h5>jobmanager.heap.size</h5></td>
             <td style="word-wrap: break-word;">"1024m"</td>
             <td>JVM heap size for the JobManager.</td>
diff --git a/docs/dev/restart_strategies.zh.md b/docs/dev/restart_strategies.zh.md
deleted file mode 100644
index 4b56187..0000000
--- a/docs/dev/restart_strategies.zh.md
+++ /dev/null
@@ -1,270 +0,0 @@
----
-title: "重启策略"
-nav-parent_id: execution
-nav-pos: 50
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-Flink supports different restart strategies which control how the jobs are restarted in case of a failure.
-The cluster can be started with a default restart strategy which is always used when no job specific restart strategy has been defined.
-In case that the job is submitted with a restart strategy, this strategy overrides the cluster's default setting.
-
-* This will be replaced by the TOC
-{:toc}
-
-## Overview
-
-The default restart strategy is set via Flink's configuration file `flink-conf.yaml`.
-The configuration parameter *restart-strategy* defines which strategy is taken.
-If checkpointing is not enabled, the "no restart" strategy is used.
-If checkpointing is activated and the restart strategy has not been configured, the fixed-delay strategy is used with 
-`Integer.MAX_VALUE` restart attempts.
-See the following list of available restart strategies to learn what values are supported.
-
-Each restart strategy comes with its own set of parameters which control its behaviour.
-These values are also set in the configuration file.
-The description of each restart strategy contains more information about the respective configuration values.
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 50%">Restart Strategy</th>
-      <th class="text-left">Value for restart-strategy</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-        <td>Fixed delay</td>
-        <td>fixed-delay</td>
-    </tr>
-    <tr>
-        <td>Failure rate</td>
-        <td>failure-rate</td>
-    </tr>
-    <tr>
-        <td>No restart</td>
-        <td>none</td>
-    </tr>
-  </tbody>
-</table>
-
-Apart from defining a default restart strategy, it is possible to define for each Flink job a specific restart strategy.
-This restart strategy is set programmatically by calling the `setRestartStrategy` method on the `ExecutionEnvironment`.
-Note that this also works for the `StreamExecutionEnvironment`.
-
-The following example shows how we can set a fixed delay restart strategy for our job.
-In case of a failure the system tries to restart the job 3 times and waits 10 seconds in-between successive restart attempts.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
-  3, // number of restart attempts
-  Time.of(10, TimeUnit.SECONDS) // delay
-));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = ExecutionEnvironment.getExecutionEnvironment()
-env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
-  3, // number of restart attempts
-  Time.of(10, TimeUnit.SECONDS) // delay
-))
-{% endhighlight %}
-</div>
-</div>
-
-{% top %}
-
-## Restart Strategies
-
-The following sections describe restart strategy specific configuration options.
-
-### Fixed Delay Restart Strategy
-
-The fixed delay restart strategy attempts a given number of times to restart the job.
-If the maximum number of attempts is exceeded, the job eventually fails.
-In-between two consecutive restart attempts, the restart strategy waits a fixed amount of time.
-
-This strategy is enabled as default by setting the following configuration parameter in `flink-conf.yaml`.
-
-{% highlight yaml %}
-restart-strategy: fixed-delay
-{% endhighlight %}
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 40%">Configuration Parameter</th>
-      <th class="text-left" style="width: 40%">Description</th>
-      <th class="text-left">Default Value</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-        <td><code>restart-strategy.fixed-delay.attempts</code></td>
-        <td>The number of times that Flink retries the execution before the job is declared as failed.</td>
-        <td>1, or <code>Integer.MAX_VALUE</code> if activated by checkpointing</td>
-    </tr>
-    <tr>
-        <td><code>restart-strategy.fixed-delay.delay</code></td>
-        <td>Delaying the retry means that after a failed execution, the re-execution does not start immediately, but only after a certain delay. Delaying the retries can be helpful when the program interacts with external systems where for example connections or pending transactions should reach a timeout before re-execution is attempted.</td>
-        <td><code>akka.ask.timeout</code>, or 10s if activated by checkpointing</td>
-    </tr>
-  </tbody>
-</table>
-
-For example:
-
-{% highlight yaml %}
-restart-strategy.fixed-delay.attempts: 3
-restart-strategy.fixed-delay.delay: 10 s
-{% endhighlight %}
-
-The fixed delay restart strategy can also be set programmatically:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
-  3, // number of restart attempts
-  Time.of(10, TimeUnit.SECONDS) // delay
-));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = ExecutionEnvironment.getExecutionEnvironment()
-env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
-  3, // number of restart attempts
-  Time.of(10, TimeUnit.SECONDS) // delay
-))
-{% endhighlight %}
-</div>
-</div>
-
-{% top %}
-
-### Failure Rate Restart Strategy
-
-The failure rate restart strategy restarts job after failure, but when `failure rate` (failures per time interval) is exceeded, the job eventually fails.
-In-between two consecutive restart attempts, the restart strategy waits a fixed amount of time.
-
-This strategy is enabled as default by setting the following configuration parameter in `flink-conf.yaml`.
-
-{% highlight yaml %}
-restart-strategy: failure-rate
-{% endhighlight %}
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 40%">Configuration Parameter</th>
-      <th class="text-left" style="width: 40%">Description</th>
-      <th class="text-left">Default Value</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-        <td><it>restart-strategy.failure-rate.max-failures-per-interval</it></td>
-        <td>Maximum number of restarts in given time interval before failing a job</td>
-        <td>1</td>
-    </tr>
-    <tr>
-        <td><it>restart-strategy.failure-rate.failure-rate-interval</it></td>
-        <td>Time interval for measuring failure rate.</td>
-        <td>1 minute</td>
-    </tr>
-    <tr>
-        <td><it>restart-strategy.failure-rate.delay</it></td>
-        <td>Delay between two consecutive restart attempts</td>
-        <td><it>akka.ask.timeout</it></td>
-    </tr>
-  </tbody>
-</table>
-
-{% highlight yaml %}
-restart-strategy.failure-rate.max-failures-per-interval: 3
-restart-strategy.failure-rate.failure-rate-interval: 5 min
-restart-strategy.failure-rate.delay: 10 s
-{% endhighlight %}
-
-The failure rate restart strategy can also be set programmatically:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-env.setRestartStrategy(RestartStrategies.failureRateRestart(
-  3, // max failures per interval
-  Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
-  Time.of(10, TimeUnit.SECONDS) // delay
-));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = ExecutionEnvironment.getExecutionEnvironment()
-env.setRestartStrategy(RestartStrategies.failureRateRestart(
-  3, // max failures per unit
-  Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
-  Time.of(10, TimeUnit.SECONDS) // delay
-))
-{% endhighlight %}
-</div>
-</div>
-
-{% top %}
-
-### No Restart Strategy
-
-The job fails directly and no restart is attempted.
-
-{% highlight yaml %}
-restart-strategy: none
-{% endhighlight %}
-
-The no restart strategy can also be set programmatically:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-env.setRestartStrategy(RestartStrategies.noRestart());
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = ExecutionEnvironment.getExecutionEnvironment()
-env.setRestartStrategy(RestartStrategies.noRestart())
-{% endhighlight %}
-</div>
-</div>
-
-### Fallback Restart Strategy
-
-The cluster defined restart strategy is used. 
-This is helpful for streaming programs which enable checkpointing.
-By default, a fixed delay restart strategy is chosen if there is no other restart strategy defined.
-
-{% top %}
diff --git a/docs/dev/restart_strategies.md b/docs/dev/task_failure_recovery.md
similarity index 78%
rename from docs/dev/restart_strategies.md
rename to docs/dev/task_failure_recovery.md
index 5be430e..055e956 100644
--- a/docs/dev/restart_strategies.md
+++ b/docs/dev/task_failure_recovery.md
@@ -1,5 +1,5 @@
 ---
-title: "Restart Strategies"
+title: "Task Failure Recovery"
 nav-parent_id: execution
 nav-pos: 50
 ---
@@ -22,14 +22,19 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-Flink supports different restart strategies which control how the jobs are restarted in case of a failure.
-The cluster can be started with a default restart strategy which is always used when no job specific restart strategy has been defined.
-In case that the job is submitted with a restart strategy, this strategy overrides the cluster's default setting.
+When a task failure happens, Flink needs to restart the failed task and other affected tasks to recover the job to a normal state.
+
+Restart strategies and failover strategies are used to control the task restarting.
+Restart strategies decide whether and when the failed/affected tasks can be restarted.
+Failover strategies decide which tasks should be restarted to recover the job.
 
 * This will be replaced by the TOC
 {:toc}
 
-## Overview
+## Restart Strategies
+
+The cluster can be started with a default restart strategy which is always used when no job specific restart strategy has been defined.
+In case that the job is submitted with a restart strategy, this strategy overrides the cluster's default setting.
 
 The default restart strategy is set via Flink's configuration file `flink-conf.yaml`.
 The configuration parameter *restart-strategy* defines which strategy is taken.
@@ -95,8 +100,6 @@ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
 
 {% top %}
 
-## Restart Strategies
-
 The following sections describe restart strategy specific configuration options.
 
 ### Fixed Delay Restart Strategy
@@ -267,4 +270,53 @@ The cluster defined restart strategy is used.
 This is helpful for streaming programs which enable checkpointing.
 By default, a fixed delay restart strategy is chosen if there is no other restart strategy defined.
 
+## Failover Strategies
+
+Flink supports different failover strategies which can be configured via the configuration parameter
+*jobmanager.execution.failover-strategy* in Flink's configuration file `flink-conf.yaml`.
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 50%">Failover Strategy</th>
+      <th class="text-left">Value for jobmanager.execution.failover-strategy</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td>Restart all</td>
+        <td>full</td>
+    </tr>
+    <tr>
+        <td>Restart pipelined region</td>
+        <td>region</td>
+    </tr>
+  </tbody>
+</table>
+
+### Restart All Failover Strategy
+
+This strategy restarts all tasks in the job to recover from a task failure.
+
+### Restart Pipelined Region Failover Strategy
+
+This strategy groups tasks into disjoint regions. When a task failure is detected, 
+this strategy computes the smallest set of regions that must be restarted to recover from the failure. 
+For some jobs this can result in fewer tasks that will be restarted compared to the Restart All Failover Strategy.
+
+A region is a set of tasks that communicate via pipelined data exchanges. 
+That is, batch data exchanges denote the boundaries of a region.
+- All data exchanges in a DataStream job or Streaming Table/SQL job are pipelined.
+- All data exchanges in a Batch Table/SQL job are batched by default.
+- The data exchange types in a DataSet job are determined by the 
+  [ExecutionMode]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/api/common/ExecutionMode.html) 
+  which can be set through [ExecutionConfig]({{ site.baseurl }}/dev/execution_configuration.html).
+
+The regions to restart are decided as below:
+1. The region containing the failed task will be restarted.
+2. If a result partition is not available while it is required by a region that will be restarted,
+   the region producing the result partition will be restarted as well.
+3. If a region is to be restarted, all of its consumer regions will also be restarted. This is to guarantee
+   data consistency because nondeterministic processing or partitioning can result in different partitions.
+
 {% top %}
diff --git a/docs/dev/task_failure_recovery.zh.md b/docs/dev/task_failure_recovery.zh.md
new file mode 100644
index 0000000..e67fb6e
--- /dev/null
+++ b/docs/dev/task_failure_recovery.zh.md
@@ -0,0 +1,314 @@
+---
+title: "Task 故障恢复"
+nav-parent_id: execution
+nav-pos: 50
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+当 Task 发生故障时,Flink 需要重启出错的 Task 以及其他受到影响的 Task ,以使得作业恢复到正常执行状态。
+
+Flink 通过重启策略和故障恢复策略来控制 Task 重启:重启策略决定是否可以重启以及重启的间隔;故障恢复策略决定哪些 Task 需要重启。
+
+* This will be replaced by the TOC
+{:toc}
+
+## 重启策略
+
+Flink 作业如果没有定义重启策略,则会遵循集群启动时加载的默认重启策略。
+如果提交作业时设置了重启策略,该策略将覆盖掉集群的默认策略。
+
+通过 Flink 的配置文件 `flink-conf.yaml` 来设置默认的重启策略。配置参数 *restart-strategy* 定义了采取何种策略。
+如果没有启用 checkpoint,就采用“不重启”策略。如果启用了 checkpoint 且没有配置重启策略,那么就采用固定延时重启策略,
+此时最大尝试重启次数由 `Integer.MAX_VALUE` 参数设置。下表列出了可用的重启策略和与其对应的配置值。
+
+每个重启策略都有自己的一组配置参数来控制其行为。
+这些参数也在配置文件中设置。
+后文的描述中会详细介绍每种重启策略的配置项。
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 50%">重启策略</th>
+      <th class="text-left">restart-strategy 配置值</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td>固定延时重启策略</td>
+        <td>fixed-delay</td>
+    </tr>
+    <tr>
+        <td>故障率重启策略</td>
+        <td>failure-rate</td>
+    </tr>
+    <tr>
+        <td>不重启策略</td>
+        <td>none</td>
+    </tr>
+  </tbody>
+</table>
+
+除了定义默认的重启策略以外,还可以为每个 Flink 作业单独定义重启策略。
+这个重启策略通过在程序中的 `ExecutionEnvironment` 对象上调用 `setRestartStrategy` 方法来设置。
+当然,对于 `StreamExecutionEnvironment` 也同样适用。
+
+下例展示了如何给我们的作业设置固定延时重启策略。
+如果发生故障,系统会重启作业 3 次,每两次连续的重启尝试之间等待 10 秒钟。
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
+  3, // 尝试重启的次数
+  Time.of(10, TimeUnit.SECONDS) // 延时
+));
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment()
+env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
+  3, // 尝试重启的次数
+  Time.of(10, TimeUnit.SECONDS) // 延时
+))
+{% endhighlight %}
+</div>
+</div>
+
+
+
+以下部分详细描述重启策略的配置项。
+
+### 固定延时重启策略
+
+固定延时重启策略按照给定的次数尝试重启作业。
+如果尝试超过了给定的最大次数,作业将最终失败。
+在连续的两次重启尝试之间,重启策略等待一段固定长度的时间。
+
+通过在 `flink-conf.yaml` 中设置如下配置参数,默认启用此策略。
+
+{% highlight yaml %}
+restart-strategy: fixed-delay
+{% endhighlight %}
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 40%">配置参数</th>
+      <th class="text-left" style="width: 40%">描述</th>
+      <th class="text-left">默认配置值</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td><code>restart-strategy.fixed-delay.attempts</code></td>
+        <td>作业宣告失败之前 Flink 重试执行的最大次数</td>
+        <td>启用 checkpoint 的话是 <code>Integer.MAX_VALUE</code>,否则是 1</td>
+    </tr>
+    <tr>
+        <td><code>restart-strategy.fixed-delay.delay</code></td>
+        <td>延时重试意味着执行遭遇故障后,并不立即重新启动,而是延后一段时间。当程序与外部系统有交互时延时重试可能会有所帮助,比如程序里有连接或者挂起的事务的话,在尝试重新执行之前应该等待连接或者挂起的事务超时。</td>
+        <td>启用 checkpoint 的话是 10 秒,否则使用 <code>akka.ask.timeout</code> 的值</td>
+    </tr>
+  </tbody>
+</table>
+
+例如:
+
+{% highlight yaml %}
+restart-strategy.fixed-delay.attempts: 3
+restart-strategy.fixed-delay.delay: 10 s
+{% endhighlight %}
+
+固定延迟重启策略也可以在程序中设置:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
+  3, // 尝试重启的次数
+  Time.of(10, TimeUnit.SECONDS) // 延时
+));
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment()
+env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
+  3, // 尝试重启的次数
+  Time.of(10, TimeUnit.SECONDS) // 延时
+))
+{% endhighlight %}
+</div>
+</div>
+
+
+### 故障率重启策略
+
+故障率重启策略在故障发生之后重启作业,但是当**故障率**(每个时间间隔发生故障的次数)超过设定的限制时,作业会最终失败。
+在连续的两次重启尝试之间,重启策略等待一段固定长度的时间。
+
+通过在 `flink-conf.yaml` 中设置如下配置参数,默认启用此策略。
+
+{% highlight yaml %}
+restart-strategy: failure-rate
+{% endhighlight %}
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 40%">配置参数</th>
+      <th class="text-left" style="width: 40%">描述</th>
+      <th class="text-left">配置默认值</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td><it>restart-strategy.failure-rate.max-failures-per-interval</it></td>
+        <td>单个时间间隔内允许的最大重启次数</td>
+        <td>1</td>
+    </tr>
+    <tr>
+        <td><it>restart-strategy.failure-rate.failure-rate-interval</it></td>
+        <td>测量故障率的时间间隔</td>
+        <td>1 分钟</td>
+    </tr>
+    <tr>
+        <td><it>restart-strategy.failure-rate.delay</it></td>
+        <td>连续两次重启尝试之间的延时</td>
+        <td><it>akka.ask.timeout</it></td>
+    </tr>
+  </tbody>
+</table>
+
+例如:
+
+{% highlight yaml %}
+restart-strategy.failure-rate.max-failures-per-interval: 3
+restart-strategy.failure-rate.failure-rate-interval: 5 min
+restart-strategy.failure-rate.delay: 10 s
+{% endhighlight %}
+
+故障率重启策略也可以在程序中设置:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+env.setRestartStrategy(RestartStrategies.failureRateRestart(
+  3, // 每个时间间隔的最大故障次数
+  Time.of(5, TimeUnit.MINUTES), // 测量故障率的时间间隔
+  Time.of(10, TimeUnit.SECONDS) // 延时
+));
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment()
+env.setRestartStrategy(RestartStrategies.failureRateRestart(
+  3, // 每个时间间隔的最大故障次数
+  Time.of(5, TimeUnit.MINUTES), // 测量故障率的时间间隔
+  Time.of(10, TimeUnit.SECONDS) // 延时
+))
+{% endhighlight %}
+</div>
+</div>
+
+
+### 不重启策略
+
+作业直接失败,不尝试重启。
+
+{% highlight yaml %}
+restart-strategy: none
+{% endhighlight %}
+
+不重启策略也可以在程序中设置:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+env.setRestartStrategy(RestartStrategies.noRestart());
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment()
+env.setRestartStrategy(RestartStrategies.noRestart())
+{% endhighlight %}
+</div>
+</div>
+
+### 备用重启策略
+
+使用群集定义的重启策略。
+这对于启用了 checkpoint 的流处理程序很有帮助。
+如果没有定义其他重启策略,默认选择固定延时重启策略。
+
+## 故障恢复策略
+
+Flink 支持多种不同的故障恢复策略,该策略需要通过 Flink 配置文件 `flink-conf.yaml` 中的 *jobmanager.execution.failover-strategy*
+配置项进行配置。
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 50%">故障恢复策略</th>
+      <th class="text-left">jobmanager.execution.failover-strategy 配置值</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td>全图重启</td>
+        <td>full</td>
+    </tr>
+    <tr>
+        <td>基于 Region 的局部重启</td>
+        <td>region</td>
+    </tr>
+  </tbody>
+</table>
+
+### 全图重启故障恢复策略
+
+在全图重启故障恢复策略下,Task 发生故障时会重启作业中的所有 Task 进行故障恢复。
+
+### 基于 Region 的局部重启故障恢复策略
+
+该策略会将作业中的所有 Task 划分为数个 Region。当有 Task 发生故障时,它会尝试找出进行故障恢复需要重启的最小 Region 集合。
+相比于全局重启故障恢复策略,这种策略在一些场景下的故障恢复需要重启的 Task 会更少。
+
+此处 Region 指以 Pipelined 形式进行数据交换的 Task 集合。也就是说,Batch 形式的数据交换会构成 Region 的边界。
+- DataStream 和 流式 Table/SQL 作业的所有数据交换都是 Pipelined 形式的。
+- 批处理式 Table/SQL 作业的所有数据交换默认都是 Batch 形式的。
+- DataSet 作业中的数据交换形式会根据 [ExecutionConfig]({{ site.baseurl }}/zh/dev/execution_configuration.html) 
+  中配置的 [ExecutionMode]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/api/common/ExecutionMode.html)
+  决定。
+
+需要重启的 Region 的判断逻辑如下:
+1. 出错 Task 所在 Region 需要重启。
+2. 如果要重启的 Region 需要消费的数据有部分无法访问(丢失或损坏),产出该部分数据的 Region 也需要重启。
+3. 需要重启的 Region 的下游 Region 也需要重启。这是出于保障数据一致性的考虑,因为一些非确定性的计算或者分发会导致同一个
+   Result Partition 每次产生时包含的数据都不相同。
+
+{% top %}
diff --git a/docs/redirects/restart_strategies.md b/docs/redirects/restart_strategies.md
new file mode 100644
index 0000000..7eef69e
--- /dev/null
+++ b/docs/redirects/restart_strategies.md
@@ -0,0 +1,24 @@
+---
+title: "Restart Strategies"
+layout: redirect
+redirect: /dev/task_failure_recovery.html
+permalink: /dev/restart_strategies.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index 75883ba..e062829 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -23,6 +23,7 @@ import org.apache.flink.annotation.docs.Documentation;
 import org.apache.flink.configuration.description.Description;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.configuration.description.LinkElement.link;
 import static org.apache.flink.configuration.description.TextElement.text;
 
 /**
@@ -105,8 +106,16 @@ public class JobManagerOptions {
 
 	/**
 	 * This option specifies the failover strategy, i.e. how the job computation recovers from task failures.
+	 *
+	 * <p>The options "individual" and "region-legacy" are intentionally not included
+	 * as they have some known limitations or issues:
+	 * <ul>
+	 *     <li>"individual" strategy only works when all tasks are not connected, in which case the "region"
+	 * failover strategy would also restart failed tasks individually.
+	 *     <li>"region-legacy" strategy is not able to backtrack missing input result partitions.
+	 * </ul>
+	 * The new "region" strategy supersedes "individual" and "region-legacy" strategies and should always work.
 	 */
-	@Documentation.ExcludeFromDocumentation("The failover strategy feature is highly experimental.")
 	public static final ConfigOption<String> EXECUTION_FAILOVER_STRATEGY =
 		key("jobmanager.execution.failover-strategy")
 			.defaultValue("full")
@@ -114,9 +123,12 @@ public class JobManagerOptions {
 				.text("This option specifies how the job computation recovers from task failures. " +
 					"Accepted values are:")
 				.list(
-					text("'full': Restarts all tasks."),
-					text("'individual': Restarts only the failed task. Should only be used if all tasks are independent components."),
-					text("'region': Restarts all tasks that could be affected by the task failure.")
+					text("'full': Restarts all tasks to recover the job."),
+					text("'region': Restarts all tasks that could be affected by the task failure. " +
+						"More details can be found %s.",
+						link(
+							"../dev/task_failure_recovery.html#restart-pipelined-region-failover-strategy",
+							"here"))
 				).build());
 
 	/**